diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index fc51c74ab..0eb4f0c4a 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -88,7 +88,6 @@ func (w *tcpWorker) handleConnections(conns <-chan internet.Connection) { for { select { case conn := <-conns: - conn.SetReusable(false) conn.Close() default: break L @@ -166,12 +165,6 @@ func (*udpConn) SetWriteDeadline(time.Time) error { return nil } -func (*udpConn) Reusable() bool { - return false -} - -func (*udpConn) SetReusable(bool) {} - type udpWorker struct { sync.RWMutex diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index 2e26c2570..7fa4d889c 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -191,11 +191,3 @@ func (v *Connection) SetReadDeadline(t time.Time) error { func (v *Connection) SetWriteDeadline(t time.Time) error { return nil } - -func (v *Connection) Reusable() bool { - return false -} - -func (v *Connection) SetReusable(bool) { - -} diff --git a/common/protocol/headers_test.go b/common/protocol/headers_test.go index ab7ad7951..5ecbdd8ab 100644 --- a/common/protocol/headers_test.go +++ b/common/protocol/headers_test.go @@ -16,8 +16,8 @@ func TestRequestOptionSet(t *testing.T) { option.Set(RequestOptionChunkStream) assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue() - option.Set(RequestOptionConnectionReuse) - assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue() + option.Set(RequestOptionChunkMasking) + assert.Bool(option.Has(RequestOptionChunkMasking)).IsTrue() assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue() } @@ -26,9 +26,9 @@ func TestRequestOptionClear(t *testing.T) { var option RequestOption option.Set(RequestOptionChunkStream) - option.Set(RequestOptionConnectionReuse) + option.Set(RequestOptionChunkMasking) option.Clear(RequestOptionChunkStream) assert.Bool(option.Has(RequestOptionChunkStream)).IsFalse() - assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue() + assert.Bool(option.Has(RequestOptionChunkMasking)).IsTrue() } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index eca4ceaca..9a45009b1 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -45,7 +45,6 @@ func (d *DokodemoDoor) Network() net.NetworkList { func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { log.Trace(errors.New("Dokodemo: processing connection from: ", conn.RemoteAddr()).AtDebug()) - conn.SetReusable(false) dest := net.Destination{ Network: network, Address: d.address, diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 260299501..45767e608 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -105,8 +105,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } defer conn.Close() - conn.SetReusable(false) - timeout := time.Second * time.Duration(v.timeout) if timeout == 0 { timeout = time.Minute * 5 diff --git a/proxy/http/server.go b/proxy/http/server.go index 46ed302e9..f546406d0 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -69,8 +69,6 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error } func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { - conn.SetReusable(false) - conn.SetReadDeadline(time.Now().Add(time.Second * 8)) reader := bufio.NewReaderSize(conn, 2048) diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 0722872e9..08cf354ae 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -65,7 +65,6 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale log.Trace(errors.New("tunneling request to ", destination, " via ", server.Destination()).Path("Proxy", "Shadowsocks", "Client")) defer conn.Close() - conn.SetReusable(false) request := &protocol.RequestHeader{ Version: Version, diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index f0c0c66b0..2933cfc5d 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -60,8 +60,6 @@ func (s *Server) Network() net.NetworkList { } func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { - conn.SetReusable(false) - switch network { case net.Network_TCP: return s.handleConnection(ctx, conn, dispatcher) @@ -183,7 +181,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, responseDone := signal.ExecuteAsync(func() error { defer ray.InboundInput().Close() - if err := buf.PipeUntilEOF(timer, bodyReader, ray.InboundInput()); err != nil { + mergeReader := buf.NewMergingReader(bodyReader) + if err := buf.PipeUntilEOF(timer, mergeReader, ray.InboundInput()); err != nil { return errors.New("failed to transport all TCP request").Base(err).Path("Shadowsocks", "Server") } return nil diff --git a/proxy/socks/client.go b/proxy/socks/client.go index 735a258b7..2efeb90a0 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -59,7 +59,6 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. } defer conn.Close() - conn.SetReusable(false) request := &protocol.RequestHeader{ Version: socks5Version, diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 8b236471e..44eea4445 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -48,8 +48,6 @@ func (s *Server) Network() net.NetworkList { } func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { - conn.SetReusable(false) - switch network { case net.Network_TCP: return s.processTCP(ctx, conn, dispatcher) diff --git a/proxy/vmess/encoding/encoding_test.go b/proxy/vmess/encoding/encoding_test.go index f92ea99f5..452d9a6bd 100644 --- a/proxy/vmess/encoding/encoding_test.go +++ b/proxy/vmess/encoding/encoding_test.go @@ -31,7 +31,6 @@ func TestRequestSerialization(t *testing.T) { Version: 1, User: user, Command: protocol.RequestCommandTCP, - Option: protocol.RequestOptionConnectionReuse, Address: v2net.DomainAddress("www.v2ray.com"), Port: v2net.Port(443), Security: protocol.Security(protocol.SecurityType_AES128_GCM), diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 8a7fdc624..3280aaeeb 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -138,8 +138,9 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio bodyWriter := session.EncodeResponseBody(request, output) + mergeReader := buf.NewMergingReader(input) // Optimize for small response packet - data, err := input.Read() + data, err := mergeReader.Read() if err != nil { return err } @@ -155,7 +156,7 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio } } - if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil { + if err := buf.PipeUntilEOF(timer, mergeReader, bodyWriter); err != nil { return err } @@ -181,7 +182,6 @@ func (v *Handler) Process(ctx context.Context, network net.Network, connection i log.Access(connection.RemoteAddr(), "", log.AccessRejected, err) log.Trace(errors.New("VMess|Inbound: Invalid request from ", connection.RemoteAddr(), ": ", err)) } - connection.SetReusable(false) return err } log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "") @@ -189,7 +189,6 @@ func (v *Handler) Process(ctx context.Context, network net.Network, connection i connection.SetReadDeadline(time.Time{}) - connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse)) userSettings := request.User.GetSettings() ctx = protocol.ContextWithUser(ctx, request.User) @@ -214,24 +213,18 @@ func (v *Handler) Process(ctx context.Context, network net.Network, connection i Command: v.generateCommand(ctx, request), } - if connection.Reusable() { - response.Option.Set(protocol.ResponseOptionConnectionReuse) - } - responseDone := signal.ExecuteAsync(func() error { return transferResponse(timer, session, request, response, output, writer) }) if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil { - connection.SetReusable(false) input.CloseError() output.CloseError() - return errors.New("error during processing").Base(err).Path("VMess", "Inbound") + return errors.New("error during processing").Base(err).Path("Proxy", "VMess", "Inbound") } if err := writer.Flush(); err != nil { - connection.SetReusable(false) - return errors.New("error during flushing remaining data").Base(err).Path("VMess", "Inbound") + return errors.New("error during flushing remaining data").Base(err).Path("Proxy", "VMess", "Inbound") } runtime.KeepAlive(timer) diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 9931a5455..6a466d4da 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -95,11 +95,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial request.Option.Set(protocol.RequestOptionChunkMasking) } - conn.SetReusable(true) - if conn.Reusable() { // Conn reuse may be disabled on transportation layer - request.Option.Set(protocol.RequestOptionConnectionReuse) - } - input := outboundRay.OutboundInput() output := outboundRay.OutboundOutput() @@ -154,8 +149,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } v.handleCommand(rec.Destination(), header.Command) - conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse)) - reader.SetBuffered(false) bodyReader := session.DecodeResponseBody(request, reader) if err := buf.PipeUntilEOF(timer, bodyReader, output); err != nil { @@ -166,7 +159,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial }) if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil { - conn.SetReusable(false) return errors.New("connection ends").Base(err).Path("VMess", "Outbound") } runtime.KeepAlive(timer) diff --git a/testing/scenarios/tls_test.go b/testing/scenarios/tls_test.go index faff65a94..2059fbcc8 100644 --- a/testing/scenarios/tls_test.go +++ b/testing/scenarios/tls_test.go @@ -341,11 +341,7 @@ func TestTLSOverWebSocket(t *testing.T) { TransportSettings: []*internet.TransportConfig{ { Protocol: internet.TransportProtocol_WebSocket, - Settings: serial.ToTypedMessage(&websocket.Config{ - ConnectionReuse: &websocket.ConnectionReuse{ - Enable: false, - }, - }), + Settings: serial.ToTypedMessage(&websocket.Config{}), }, }, SecurityType: serial.GetMessageType(&tls.Config{}), @@ -381,136 +377,3 @@ func TestTLSOverWebSocket(t *testing.T) { CloseAllServers() } - -func TestTLSConnectionReuse(t *testing.T) { - assert := assert.On(t) - - tcpServer := tcp.Server{ - MsgProcessor: xor, - } - dest, err := tcpServer.Start() - assert.Error(err).IsNil() - defer tcpServer.Close() - - userID := protocol.NewID(uuid.New()) - serverPort := pickPort() - serverConfig := &core.Config{ - Inbound: []*proxyman.InboundHandlerConfig{ - { - ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ - PortRange: v2net.SinglePortRange(serverPort), - Listen: v2net.NewIPOrDomain(v2net.LocalHostIP), - StreamSettings: &internet.StreamConfig{ - SecurityType: serial.GetMessageType(&tls.Config{}), - SecuritySettings: []*serial.TypedMessage{ - serial.ToTypedMessage(&tls.Config{ - Certificate: []*tls.Certificate{tlsgen.GenerateCertificateForTest()}, - }), - }, - }, - }), - ProxySettings: serial.ToTypedMessage(&inbound.Config{ - User: []*protocol.User{ - { - Account: serial.ToTypedMessage(&vmess.Account{ - Id: userID.String(), - }), - }, - }, - }), - }, - }, - Outbound: []*proxyman.OutboundHandlerConfig{ - { - ProxySettings: serial.ToTypedMessage(&freedom.Config{}), - }, - }, - } - - clientPort := pickPort() - clientConfig := &core.Config{ - Inbound: []*proxyman.InboundHandlerConfig{ - { - ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ - PortRange: v2net.SinglePortRange(clientPort), - Listen: v2net.NewIPOrDomain(v2net.LocalHostIP), - }), - ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ - Address: v2net.NewIPOrDomain(dest.Address), - Port: uint32(dest.Port), - NetworkList: &v2net.NetworkList{ - Network: []v2net.Network{v2net.Network_TCP}, - }, - }), - }, - }, - Outbound: []*proxyman.OutboundHandlerConfig{ - { - ProxySettings: serial.ToTypedMessage(&outbound.Config{ - Receiver: []*protocol.ServerEndpoint{ - { - Address: v2net.NewIPOrDomain(v2net.LocalHostIP), - Port: uint32(serverPort), - User: []*protocol.User{ - { - Account: serial.ToTypedMessage(&vmess.Account{ - Id: userID.String(), - }), - }, - }, - }, - }, - }), - SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{ - StreamSettings: &internet.StreamConfig{ - SecurityType: serial.GetMessageType(&tls.Config{}), - SecuritySettings: []*serial.TypedMessage{ - serial.ToTypedMessage(&tls.Config{ - AllowInsecure: true, - }), - }, - }, - }), - }, - }, - } - - assert.Error(InitializeServerConfig(serverConfig)).IsNil() - assert.Error(InitializeServerConfig(clientConfig)).IsNil() - - for i := 0; i < 5; i++ { - conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: int(clientPort), - }) - assert.Error(err).IsNil() - - payload := "dokodemo request." - nBytes, err := conn.Write([]byte(payload)) - assert.Error(err).IsNil() - assert.Int(nBytes).Equals(len(payload)) - - response := readFrom(conn, time.Second*2, len(payload)) - assert.Bytes(response).Equals(xor([]byte(payload))) - assert.Error(conn.Close()).IsNil() - } - - time.Sleep(time.Second * 10) - - conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: int(clientPort), - }) - assert.Error(err).IsNil() - - payload := "dokodemo request." - nBytes, err := conn.Write([]byte(payload)) - assert.Error(err).IsNil() - assert.Int(nBytes).Equals(len(payload)) - - response := readFrom(conn, time.Second*2, len(payload)) - assert.Bytes(response).Equals(xor([]byte(payload))) - assert.Error(conn.Close()).IsNil() - - CloseAllServers() -} diff --git a/tools/conf/transport_internet.go b/tools/conf/transport_internet.go index 38d2a0758..2ec2423ee 100644 --- a/tools/conf/transport_internet.go +++ b/tools/conf/transport_internet.go @@ -97,17 +97,11 @@ func (v *KCPConfig) Build() (*serial.TypedMessage, error) { } type TCPConfig struct { - ConnectionReuse *bool `json:"connectionReuse"` - HeaderConfig json.RawMessage `json:"header"` + HeaderConfig json.RawMessage `json:"header"` } func (v *TCPConfig) Build() (*serial.TypedMessage, error) { config := new(tcp.Config) - if v.ConnectionReuse != nil { - config.ConnectionReuse = &tcp.ConnectionReuse{ - Enable: *v.ConnectionReuse, - } - } if len(v.HeaderConfig) > 0 { headerConfig, _, err := tcpHeaderLoader.Load(v.HeaderConfig) if err != nil { @@ -124,19 +118,13 @@ func (v *TCPConfig) Build() (*serial.TypedMessage, error) { } type WebSocketConfig struct { - ConnectionReuse *bool `json:"connectionReuse"` - Path string `json:"Path"` + Path string `json:"Path"` } func (v *WebSocketConfig) Build() (*serial.TypedMessage, error) { config := &websocket.Config{ Path: v.Path, } - if v.ConnectionReuse != nil { - config.ConnectionReuse = &websocket.ConnectionReuse{ - Enable: *v.ConnectionReuse, - } - } return serial.ToTypedMessage(config), nil } diff --git a/tools/conf/transport_test.go b/tools/conf/transport_test.go index 280565aec..b2f1bf3f4 100644 --- a/tools/conf/transport_test.go +++ b/tools/conf/transport_test.go @@ -19,7 +19,6 @@ func TestTransportConfig(t *testing.T) { rawJson := `{ "tcpSettings": { - "connectionReuse": true, "header": { "type": "http", "request": { @@ -64,7 +63,6 @@ func TestTransportConfig(t *testing.T) { case *tcp.Config: settingsCount++ assert.Bool(settingsWithProtocol.Protocol == internet.TransportProtocol_TCP).IsTrue() - assert.Bool(settings.IsConnectionReuse()).IsTrue() rawHeader, err := settings.HeaderSettings.GetInstance() assert.Error(err).IsNil() header := rawHeader.(*http.Config) diff --git a/transport/internet/connection.go b/transport/internet/connection.go index f0f33cc15..b526f7469 100644 --- a/transport/internet/connection.go +++ b/transport/internet/connection.go @@ -6,14 +6,8 @@ import ( type ConnectionHandler func(Connection) -type Reusable interface { - Reusable() bool - SetReusable(reuse bool) -} - type Connection interface { net.Conn - Reusable } type SysFd interface { diff --git a/transport/internet/internal/connection.go b/transport/internet/internal/connection.go deleted file mode 100644 index 8dcb55cbc..000000000 --- a/transport/internet/internal/connection.go +++ /dev/null @@ -1,181 +0,0 @@ -package internal - -import ( - "io" - "net" - "sync" - "time" - - v2net "v2ray.com/core/common/net" -) - -// ConnectionID is the ID of a connection. -type ConnectionID struct { - Local v2net.Address - Remote v2net.Address - RemotePort v2net.Port -} - -// NewConnectionID creates a new ConnectionId. -func NewConnectionID(source v2net.Address, dest v2net.Destination) ConnectionID { - return ConnectionID{ - Local: source, - Remote: dest.Address, - RemotePort: dest.Port, - } -} - -// Reuser determines whether a connection can be reused or not. -type Reuser struct { - // userEnabled indicates connection-reuse enabled by user. - userEnabled bool - // appEnabled indicates connection-reuse enabled by app. - appEnabled bool -} - -// ReuseConnection returns a tracker for tracking connection reusability. -func ReuseConnection(reuse bool) *Reuser { - return &Reuser{ - userEnabled: reuse, - appEnabled: reuse, - } -} - -// Connection is an implementation of net.Conn with re-usability. -type Connection struct { - sync.RWMutex - id ConnectionID - conn net.Conn - listener ConnectionRecyler - reuser *Reuser -} - -// NewConnection creates a new connection. -func NewConnection(id ConnectionID, conn net.Conn, manager ConnectionRecyler, reuser *Reuser) *Connection { - return &Connection{ - id: id, - conn: conn, - listener: manager, - reuser: reuser, - } -} - -// Read implements net.Conn.Read(). -func (v *Connection) Read(b []byte) (int, error) { - conn := v.underlyingConn() - if conn == nil { - return 0, io.EOF - } - - return conn.Read(b) -} - -// Write implement net.Conn.Write(). -func (v *Connection) Write(b []byte) (int, error) { - conn := v.underlyingConn() - if conn == nil { - return 0, io.ErrClosedPipe - } - return conn.Write(b) -} - -// Close implements net.Conn.Close(). If the connection is reusable, the underlying connection will be recycled. -func (v *Connection) Close() error { - if v == nil { - return io.ErrClosedPipe - } - - v.Lock() - defer v.Unlock() - if v.conn == nil { - return io.ErrClosedPipe - } - if v.Reusable() { - v.listener.Put(v.id, v.conn) - return nil - } - err := v.conn.Close() - v.conn = nil - return err -} - -// LocalAddr implements net.Conn.LocalAddr(). -func (v *Connection) LocalAddr() net.Addr { - conn := v.underlyingConn() - if conn == nil { - return nil - } - return conn.LocalAddr() -} - -// RemoteAddr implements net.Conn.RemoteAddr(). -func (v *Connection) RemoteAddr() net.Addr { - conn := v.underlyingConn() - if conn == nil { - return nil - } - return conn.RemoteAddr() -} - -// SetDeadline implements net.Conn.SetDeadline(). -func (v *Connection) SetDeadline(t time.Time) error { - conn := v.underlyingConn() - if conn == nil { - return nil - } - return conn.SetDeadline(t) -} - -// SetReadDeadline implements net.Conn.SetReadDeadline(). -func (v *Connection) SetReadDeadline(t time.Time) error { - conn := v.underlyingConn() - if conn == nil { - return nil - } - return conn.SetReadDeadline(t) -} - -// SetWriteDeadline implements net.Conn.SetWriteDeadline(). -func (v *Connection) SetWriteDeadline(t time.Time) error { - conn := v.underlyingConn() - if conn == nil { - return nil - } - return conn.SetWriteDeadline(t) -} - -// SetReusable implements internet.Reusable.SetReusable(). -func (v *Connection) SetReusable(reusable bool) { - if v == nil { - return - } - v.reuser.appEnabled = reusable -} - -// Reusable implements internet.Reusable.Reusable(). -func (v *Connection) Reusable() bool { - if v == nil { - return false - } - return v.reuser.userEnabled && v.reuser.appEnabled -} - -// SysFd implement internet.SysFd.SysFd(). -func (v *Connection) SysFd() (int, error) { - conn := v.underlyingConn() - if conn == nil { - return 0, io.ErrClosedPipe - } - return GetSysFd(conn) -} - -func (v *Connection) underlyingConn() net.Conn { - if v == nil { - return nil - } - - v.RLock() - defer v.RUnlock() - - return v.conn -} diff --git a/transport/internet/internal/pool.go b/transport/internet/internal/pool.go deleted file mode 100644 index 5c84ec837..000000000 --- a/transport/internet/internal/pool.go +++ /dev/null @@ -1,134 +0,0 @@ -package internal - -import ( - "net" - "sync" - "time" - - "v2ray.com/core/common/signal" -) - -// ConnectionRecyler is the interface for recycling connections. -type ConnectionRecyler interface { - // Put returns a connection back to a connection pool. - Put(ConnectionID, net.Conn) -} - -type NoOpConnectionRecyler struct{} - -func (NoOpConnectionRecyler) Put(ConnectionID, net.Conn) {} - -// ExpiringConnection is a connection that will expire in certain time. -type ExpiringConnection struct { - conn net.Conn - expire time.Time -} - -// Expired returns true if the connection has expired. -func (ec *ExpiringConnection) Expired() bool { - return ec.expire.Before(time.Now()) -} - -// Pool is a connection pool. -type Pool struct { - sync.RWMutex - connsByDest map[ConnectionID][]*ExpiringConnection - cleanupToken *signal.Semaphore -} - -// NewConnectionPool creates a new Pool. -func NewConnectionPool() *Pool { - p := &Pool{ - connsByDest: make(map[ConnectionID][]*ExpiringConnection), - cleanupToken: signal.NewSemaphore(1), - } - return p -} - -// Get returns a connection with matching connection ID. Nil if not found. -func (p *Pool) Get(id ConnectionID) net.Conn { - p.Lock() - defer p.Unlock() - - list, found := p.connsByDest[id] - if !found { - return nil - } - connIdx := -1 - for idx, conn := range list { - if !conn.Expired() { - connIdx = idx - break - } - } - if connIdx == -1 { - return nil - } - listLen := len(list) - conn := list[connIdx] - if connIdx != listLen-1 { - list[connIdx] = list[listLen-1] - } - list = list[:listLen-1] - p.connsByDest[id] = list - return conn.conn -} - -func (p *Pool) isEmpty() bool { - p.RLock() - defer p.RUnlock() - - return len(p.connsByDest) == 0 -} - -func (p *Pool) cleanup() { - defer p.cleanupToken.Signal() - - for !p.isEmpty() { - time.Sleep(time.Second * 5) - expiredConns := make([]net.Conn, 0, 16) - p.Lock() - for dest, list := range p.connsByDest { - validConns := make([]*ExpiringConnection, 0, len(list)) - for _, conn := range list { - if conn.Expired() { - expiredConns = append(expiredConns, conn.conn) - } else { - validConns = append(validConns, conn) - } - } - if len(validConns) != len(list) { - p.connsByDest[dest] = validConns - } - } - p.Unlock() - for _, conn := range expiredConns { - conn.Close() - } - } -} - -// Put implements ConnectionRecyler.Put(). -func (p *Pool) Put(id ConnectionID, conn net.Conn) { - expiringConn := &ExpiringConnection{ - conn: conn, - expire: time.Now().Add(time.Second * 4), - } - - p.Lock() - defer p.Unlock() - - list, found := p.connsByDest[id] - if !found { - list = []*ExpiringConnection{expiringConn} - } else { - list = append(list, expiringConn) - } - p.connsByDest[id] = list - - select { - case <-p.cleanupToken.Wait(): - go p.cleanup() - default: - } -} diff --git a/transport/internet/internal/pool_test.go b/transport/internet/internal/pool_test.go deleted file mode 100644 index ceec14b51..000000000 --- a/transport/internet/internal/pool_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package internal_test - -import ( - "net" - "testing" - "time" - - v2net "v2ray.com/core/common/net" - "v2ray.com/core/testing/assert" - . "v2ray.com/core/transport/internet/internal" -) - -type TestConnection struct { - id string - closed bool -} - -func (o *TestConnection) Read([]byte) (int, error) { - return 0, nil -} - -func (o *TestConnection) Write([]byte) (int, error) { - return 0, nil -} - -func (o *TestConnection) Close() error { - o.closed = true - return nil -} - -func (o *TestConnection) LocalAddr() net.Addr { - return nil -} - -func (o *TestConnection) RemoteAddr() net.Addr { - return nil -} - -func (o *TestConnection) SetDeadline(t time.Time) error { - return nil -} - -func (o *TestConnection) SetReadDeadline(t time.Time) error { - return nil -} - -func (o *TestConnection) SetWriteDeadline(t time.Time) error { - return nil -} - -func TestConnectionCache(t *testing.T) { - assert := assert.On(t) - - pool := NewConnectionPool() - conn := pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80)))) - assert.Pointer(conn).IsNil() - - pool.Put(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), &TestConnection{id: "test"}) - conn = pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80)))) - assert.String(conn.(*TestConnection).id).Equals("test") -} - -func TestConnectionRecycle(t *testing.T) { - assert := assert.On(t) - - pool := NewConnectionPool() - c := &TestConnection{id: "test"} - pool.Put(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), c) - time.Sleep(6 * time.Second) - assert.Bool(c.closed).IsTrue() - conn := pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80)))) - assert.Pointer(conn).IsNil() -} diff --git a/transport/internet/kcp/config.go b/transport/internet/kcp/config.go index b0131fff9..606df8ea4 100644 --- a/transport/internet/kcp/config.go +++ b/transport/internet/kcp/config.go @@ -96,13 +96,6 @@ func (v *Config) GetReceivingBufferSize() uint32 { return v.GetReadBufferSize() / v.GetMTUValue() } -func (v *Config) IsConnectionReuse() bool { - if v == nil || v.ConnectionReuse == nil { - return true - } - return v.ConnectionReuse.Enable -} - func init() { common.Must(internet.RegisterProtocolConfigCreator(internet.TransportProtocol_MKCP, func() interface{} { return new(Config) diff --git a/transport/internet/kcp/config.pb.go b/transport/internet/kcp/config.pb.go index e7b4c5544..1c0811660 100644 --- a/transport/internet/kcp/config.pb.go +++ b/transport/internet/kcp/config.pb.go @@ -143,7 +143,6 @@ type Config struct { WriteBuffer *WriteBuffer `protobuf:"bytes,6,opt,name=write_buffer,json=writeBuffer" json:"write_buffer,omitempty"` ReadBuffer *ReadBuffer `protobuf:"bytes,7,opt,name=read_buffer,json=readBuffer" json:"read_buffer,omitempty"` HeaderConfig *v2ray_core_common_serial.TypedMessage `protobuf:"bytes,8,opt,name=header_config,json=headerConfig" json:"header_config,omitempty"` - ConnectionReuse *ConnectionReuse `protobuf:"bytes,9,opt,name=connection_reuse,json=connectionReuse" json:"connection_reuse,omitempty"` } func (m *Config) Reset() { *m = Config{} } @@ -207,13 +206,6 @@ func (m *Config) GetHeaderConfig() *v2ray_core_common_serial.TypedMessage { return nil } -func (m *Config) GetConnectionReuse() *ConnectionReuse { - if m != nil { - return m.ConnectionReuse - } - return nil -} - func init() { proto.RegisterType((*MTU)(nil), "v2ray.core.transport.internet.kcp.MTU") proto.RegisterType((*TTI)(nil), "v2ray.core.transport.internet.kcp.TTI") @@ -228,36 +220,35 @@ func init() { func init() { proto.RegisterFile("v2ray.com/core/transport/internet/kcp/config.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 487 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0x5f, 0x6f, 0xd3, 0x30, - 0x14, 0xc5, 0xd5, 0x75, 0x2d, 0xe3, 0x76, 0x5b, 0x4b, 0x84, 0x50, 0x04, 0x12, 0x5a, 0x2b, 0x31, - 0x8d, 0x07, 0x1c, 0xc8, 0x5e, 0x78, 0x5e, 0x79, 0xa9, 0xa6, 0x22, 0xb0, 0x52, 0x90, 0x26, 0xa1, - 0xe0, 0x3a, 0xb7, 0x25, 0x6a, 0x63, 0x47, 0x8e, 0xb3, 0xaa, 0x7c, 0x23, 0xf8, 0x94, 0xc8, 0x4e, - 0xd3, 0x7f, 0x68, 0x6b, 0xde, 0x6a, 0xdf, 0x73, 0x7f, 0xae, 0xce, 0x3d, 0x37, 0xe0, 0xdf, 0xfb, - 0x8a, 0x2d, 0x09, 0x97, 0x89, 0xc7, 0xa5, 0x42, 0x4f, 0x2b, 0x26, 0xb2, 0x54, 0x2a, 0xed, 0xc5, - 0x42, 0xa3, 0x12, 0xa8, 0xbd, 0x19, 0x4f, 0x3d, 0x2e, 0xc5, 0x24, 0x9e, 0x92, 0x54, 0x49, 0x2d, - 0x9d, 0x6e, 0xd9, 0xa3, 0x90, 0xac, 0xf5, 0xa4, 0xd4, 0x93, 0x19, 0x4f, 0x5f, 0xbe, 0xdf, 0xc3, - 0x72, 0x99, 0x24, 0x52, 0x78, 0x19, 0xaa, 0x98, 0xcd, 0x3d, 0xbd, 0x4c, 0x31, 0x0a, 0x13, 0xcc, - 0x32, 0x36, 0xc5, 0x02, 0xda, 0x7b, 0x05, 0xf5, 0x61, 0x30, 0x72, 0x9e, 0x43, 0xe3, 0x9e, 0xcd, - 0x73, 0x74, 0x6b, 0x17, 0xb5, 0xab, 0x33, 0x5a, 0x1c, 0x4c, 0x31, 0x08, 0x06, 0x0f, 0x14, 0x2f, - 0xe1, 0x7c, 0x94, 0xce, 0x63, 0x31, 0xeb, 0xb3, 0x94, 0xf1, 0x58, 0x2f, 0x1f, 0xd0, 0x5d, 0x41, - 0xe7, 0x93, 0x5c, 0x88, 0x0a, 0xca, 0x2e, 0xb4, 0xbe, 0xab, 0x58, 0xe3, 0x4d, 0x3e, 0x99, 0xa0, - 0x72, 0x1c, 0x38, 0xce, 0xe2, 0xdf, 0xa5, 0xc6, 0xfe, 0xee, 0x5d, 0x00, 0x50, 0x64, 0xd1, 0x23, - 0x8a, 0xb7, 0xd0, 0xee, 0x4b, 0x21, 0x90, 0xeb, 0x58, 0x0a, 0x8a, 0x79, 0x86, 0xce, 0x0b, 0x68, - 0xa2, 0x60, 0xe3, 0x79, 0x21, 0x3c, 0xa1, 0xab, 0x53, 0xef, 0x4f, 0x03, 0x9a, 0x7d, 0xeb, 0xb0, - 0xf3, 0x11, 0xea, 0x89, 0xce, 0x6d, 0xbd, 0xe5, 0x5f, 0x92, 0x83, 0x4e, 0x93, 0x61, 0x30, 0xa2, - 0xa6, 0xc5, 0x74, 0x6a, 0x1d, 0xbb, 0x47, 0x95, 0x3b, 0x83, 0x60, 0x40, 0x4d, 0x8b, 0x73, 0x07, - 0xed, 0xdc, 0x1a, 0x18, 0xf2, 0x95, 0x2f, 0x6e, 0xdd, 0x52, 0x3e, 0x54, 0xa0, 0xec, 0x5a, 0x4f, - 0xcf, 0xf3, 0xdd, 0x51, 0xfc, 0x84, 0x67, 0xd1, 0xca, 0xf4, 0x0d, 0xfd, 0xd8, 0xd2, 0xaf, 0x2b, - 0xd0, 0xf7, 0x07, 0x46, 0x3b, 0xd1, 0xfe, 0x08, 0x5f, 0x03, 0x70, 0x29, 0xa6, 0x98, 0x19, 0x9f, - 0xdd, 0x86, 0x35, 0x76, 0xeb, 0xc6, 0xf9, 0x0a, 0xa7, 0x0b, 0x33, 0xcc, 0x70, 0x6c, 0x67, 0xe5, - 0x36, 0xed, 0xe3, 0xa4, 0xc2, 0xe3, 0x5b, 0x19, 0xa0, 0xad, 0xc5, 0x56, 0x20, 0x3e, 0x43, 0x4b, - 0x21, 0x8b, 0x4a, 0xe2, 0x13, 0x4b, 0x7c, 0x57, 0x81, 0xb8, 0x89, 0x0c, 0x05, 0xb5, 0x89, 0xcf, - 0x2d, 0x9c, 0xfd, 0x42, 0x16, 0xa1, 0x0a, 0x8b, 0x3d, 0x73, 0x4f, 0xfe, 0x1f, 0x62, 0xb1, 0x41, - 0xa4, 0xd8, 0x20, 0x12, 0x98, 0x0d, 0x1a, 0x16, 0x0b, 0x44, 0x4f, 0x8b, 0xe6, 0x55, 0x82, 0x7e, - 0x40, 0x87, 0xaf, 0x73, 0x17, 0x2a, 0x13, 0x3c, 0xf7, 0xa9, 0xe5, 0xf9, 0x15, 0xfe, 0xe1, 0x5e, - 0x64, 0x69, 0x9b, 0xef, 0x5e, 0xdc, 0x50, 0x78, 0xc3, 0x65, 0x72, 0x98, 0xf4, 0xa5, 0x76, 0x57, - 0x9f, 0xf1, 0xf4, 0xef, 0x51, 0xf7, 0x9b, 0x4f, 0xd9, 0x92, 0xf4, 0x8d, 0x34, 0x58, 0x4b, 0x07, - 0xa5, 0xf4, 0x96, 0xa7, 0xe3, 0xa6, 0xfd, 0x04, 0x5c, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x1d, - 0x04, 0x18, 0x2b, 0x8d, 0x04, 0x00, 0x00, + // 471 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0x5f, 0x6f, 0xd3, 0x3e, + 0x14, 0x55, 0xd7, 0xae, 0xbf, 0xfe, 0x6e, 0xf7, 0xa7, 0x44, 0x08, 0x45, 0x20, 0xa1, 0xb5, 0x12, + 0xd3, 0x78, 0xc0, 0x81, 0xee, 0x85, 0xe7, 0x95, 0x97, 0x32, 0x15, 0x81, 0x95, 0x82, 0xb4, 0x97, + 0xe2, 0x3a, 0xb7, 0xc5, 0x6a, 0x63, 0x5b, 0x8e, 0xb3, 0xaa, 0x7c, 0x24, 0x3e, 0x0d, 0x1f, 0x09, + 0xc5, 0x6e, 0xd6, 0xae, 0x68, 0x2c, 0x6f, 0x71, 0xee, 0x39, 0xc7, 0xd6, 0x39, 0xf7, 0x40, 0xff, + 0xb6, 0x6f, 0xd8, 0x9a, 0x70, 0x95, 0x46, 0x5c, 0x19, 0x8c, 0xac, 0x61, 0x32, 0xd3, 0xca, 0xd8, + 0x48, 0x48, 0x8b, 0x46, 0xa2, 0x8d, 0x16, 0x5c, 0x47, 0x5c, 0xc9, 0x99, 0x98, 0x13, 0x6d, 0x94, + 0x55, 0x41, 0xb7, 0xe4, 0x18, 0x24, 0x77, 0x78, 0x52, 0xe2, 0xc9, 0x82, 0xeb, 0xe7, 0x6f, 0xf7, + 0x64, 0xb9, 0x4a, 0x53, 0x25, 0xa3, 0x0c, 0x8d, 0x60, 0xcb, 0xc8, 0xae, 0x35, 0x26, 0x93, 0x14, + 0xb3, 0x8c, 0xcd, 0xd1, 0x8b, 0xf6, 0x5e, 0x40, 0x7d, 0x14, 0x8f, 0x83, 0xa7, 0x70, 0x78, 0xcb, + 0x96, 0x39, 0x86, 0xb5, 0xb3, 0xda, 0xc5, 0x31, 0xf5, 0x87, 0x62, 0x18, 0xc7, 0xc3, 0x07, 0x86, + 0xe7, 0x70, 0x32, 0xd6, 0x4b, 0x21, 0x17, 0x03, 0xa6, 0x19, 0x17, 0x76, 0xfd, 0x00, 0xee, 0x02, + 0x3a, 0x1f, 0xd4, 0x4a, 0x56, 0x40, 0x76, 0xa1, 0xfd, 0xcd, 0x08, 0x8b, 0x57, 0xf9, 0x6c, 0x86, + 0x26, 0x08, 0xa0, 0x91, 0x89, 0x9f, 0x25, 0xc6, 0x7d, 0xf7, 0xce, 0x00, 0x28, 0xb2, 0xe4, 0x1f, + 0x88, 0xd7, 0x70, 0x3a, 0x50, 0x52, 0x22, 0xb7, 0x42, 0x49, 0x8a, 0x79, 0x86, 0xc1, 0x33, 0x68, + 0xa2, 0x64, 0xd3, 0xa5, 0x07, 0xb6, 0xe8, 0xe6, 0xd4, 0xfb, 0xdd, 0x80, 0xe6, 0xc0, 0x39, 0x1c, + 0xbc, 0x87, 0x7a, 0x6a, 0x73, 0x37, 0x6f, 0xf7, 0xcf, 0xc9, 0xa3, 0x4e, 0x93, 0x51, 0x3c, 0xa6, + 0x05, 0xa5, 0x60, 0x5a, 0x2b, 0xc2, 0x83, 0xca, 0xcc, 0x38, 0x1e, 0xd2, 0x82, 0x12, 0xdc, 0xc0, + 0x69, 0xee, 0x0c, 0x9c, 0xf0, 0x8d, 0x2f, 0x61, 0xdd, 0xa9, 0xbc, 0xab, 0xa0, 0x72, 0xdf, 0x7a, + 0x7a, 0x92, 0xdf, 0x8f, 0xe2, 0x3b, 0x3c, 0x49, 0x36, 0xa6, 0x6f, 0xd5, 0x1b, 0x4e, 0xfd, 0xb2, + 0x82, 0xfa, 0x7e, 0x60, 0xb4, 0x93, 0xec, 0x47, 0xf8, 0x12, 0x80, 0x2b, 0x39, 0xc7, 0xac, 0xf0, + 0x39, 0x3c, 0x74, 0xc6, 0xee, 0xfc, 0x09, 0xbe, 0xc0, 0xd1, 0xaa, 0x08, 0x73, 0x32, 0x75, 0x59, + 0x85, 0x4d, 0x77, 0x39, 0xa9, 0x70, 0xf9, 0xce, 0x0e, 0xd0, 0xf6, 0x6a, 0x67, 0x21, 0x3e, 0x41, + 0xdb, 0x20, 0x4b, 0x4a, 0xc5, 0xff, 0x9c, 0xe2, 0x9b, 0x0a, 0x8a, 0xdb, 0x95, 0xa1, 0x60, 0xb6, + 0xeb, 0x73, 0x0d, 0xc7, 0x3f, 0x90, 0x25, 0x68, 0x26, 0xbe, 0x67, 0x61, 0xeb, 0xef, 0x10, 0x7d, + 0x83, 0x88, 0x6f, 0x10, 0x89, 0x8b, 0x06, 0x8d, 0x7c, 0x81, 0xe8, 0x91, 0x27, 0xfb, 0x0d, 0xfa, + 0xd8, 0x68, 0xfd, 0xdf, 0x81, 0x2b, 0x0a, 0xaf, 0xb8, 0x4a, 0x1f, 0x7f, 0xd2, 0xe7, 0xda, 0x4d, + 0x7d, 0xc1, 0xf5, 0xaf, 0x83, 0xee, 0xd7, 0x3e, 0x65, 0x6b, 0x32, 0x28, 0xa0, 0xf1, 0x1d, 0x74, + 0x58, 0x42, 0xaf, 0xb9, 0x9e, 0x36, 0x5d, 0x53, 0x2f, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x54, + 0xdd, 0xba, 0xf9, 0x34, 0x04, 0x00, 0x00, } diff --git a/transport/internet/kcp/config.proto b/transport/internet/kcp/config.proto index 41658be80..b6f61ab47 100644 --- a/transport/internet/kcp/config.proto +++ b/transport/internet/kcp/config.proto @@ -51,5 +51,5 @@ message Config { WriteBuffer write_buffer = 6; ReadBuffer read_buffer = 7; v2ray.core.common.serial.TypedMessage header_config = 8; - ConnectionReuse connection_reuse = 9; + reserved 9; } \ No newline at end of file diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index baedeb0b1..d59b162ad 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -10,7 +10,6 @@ import ( "v2ray.com/core/app/log" "v2ray.com/core/common/errors" "v2ray.com/core/common/predicate" - "v2ray.com/core/transport/internet/internal" ) var ( @@ -165,21 +164,19 @@ func (u *Updater) SetInterval(d time.Duration) { type SystemConnection interface { net.Conn - Id() internal.ConnectionID Reset(func([]Segment)) Overhead() int } // Connection is a KCP connection over UDP. type Connection struct { - conn SystemConnection - connRecycler internal.ConnectionRecyler - rd time.Time - wd time.Time // write deadline - since int64 - dataInput chan bool - dataOutput chan bool - Config *Config + conn SystemConnection + rd time.Time + wd time.Time // write deadline + since int64 + dataInput chan bool + dataOutput chan bool + Config *Config conv uint16 state State @@ -197,24 +194,21 @@ type Connection struct { dataUpdater *Updater pingUpdater *Updater - - reusable bool } // NewConnection create a new KCP connection between local and remote. -func NewConnection(conv uint16, sysConn SystemConnection, recycler internal.ConnectionRecyler, config *Config) *Connection { +func NewConnection(conv uint16, sysConn SystemConnection, config *Config) *Connection { log.Trace(errors.New("KCP|Connection: creating connection ", conv)) conn := &Connection{ - conv: conv, - conn: sysConn, - connRecycler: recycler, - since: nowMillisec(), - dataInput: make(chan bool, 1), - dataOutput: make(chan bool, 1), - Config: config, - output: NewSegmentWriter(sysConn), - mss: config.GetMTUValue() - uint32(sysConn.Overhead()) - DataSegmentOverhead, + conv: conv, + conn: sysConn, + since: nowMillisec(), + dataInput: make(chan bool, 1), + dataOutput: make(chan bool, 1), + Config: config, + output: NewSegmentWriter(sysConn), + mss: config.GetMTUValue() - uint32(sysConn.Overhead()) - DataSegmentOverhead, roundTrip: &RoundTripInfo{ rto: 100, minRtt: config.GetTTIValue(), @@ -443,14 +437,6 @@ func (v *Connection) updateTask() { v.flush() } -func (v *Connection) Reusable() bool { - return v.Config.IsConnectionReuse() && v.reusable -} - -func (v *Connection) SetReusable(b bool) { - v.reusable = b -} - func (v *Connection) Terminate() { if v == nil { return @@ -461,11 +447,7 @@ func (v *Connection) Terminate() { v.OnDataInput() v.OnDataOutput() - if v.Config.IsConnectionReuse() && v.reusable { - v.connRecycler.Put(v.conn.Id(), v.conn) - } else { - v.conn.Close() - } + v.conn.Close() v.sendingWorker.Release() v.receivingWorker.Release() } diff --git a/transport/internet/kcp/connection_test.go b/transport/internet/kcp/connection_test.go index 1c47b5acd..ae660622b 100644 --- a/transport/internet/kcp/connection_test.go +++ b/transport/internet/kcp/connection_test.go @@ -6,7 +6,6 @@ import ( "time" "v2ray.com/core/testing/assert" - "v2ray.com/core/transport/internet/internal" . "v2ray.com/core/transport/internet/kcp" ) @@ -48,20 +47,12 @@ func (o *NoOpConn) SetWriteDeadline(time.Time) error { return nil } -func (o *NoOpConn) Id() internal.ConnectionID { - return internal.ConnectionID{} -} - func (o *NoOpConn) Reset(input func([]Segment)) {} -type NoOpRecycler struct{} - -func (o *NoOpRecycler) Put(internal.ConnectionID, net.Conn) {} - func TestConnectionReadTimeout(t *testing.T) { assert := assert.On(t) - conn := NewConnection(1, &NoOpConn{}, &NoOpRecycler{}, &Config{}) + conn := NewConnection(1, &NoOpConn{}, &Config{}) conn.SetReadDeadline(time.Now().Add(time.Second)) b := make([]byte, 1024) diff --git a/transport/internet/kcp/dialer.go b/transport/internet/kcp/dialer.go index 40a2cbef6..63c3e37ed 100644 --- a/transport/internet/kcp/dialer.go +++ b/transport/internet/kcp/dialer.go @@ -15,19 +15,16 @@ import ( "v2ray.com/core/common/errors" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/internet/internal" v2tls "v2ray.com/core/transport/internet/tls" ) var ( globalConv = uint32(dice.RandomUint16()) - globalPool = internal.NewConnectionPool() ) type ClientConnection struct { sync.RWMutex net.Conn - id internal.ConnectionID input func([]Segment) reader PacketReader writer PacketWriter @@ -57,10 +54,6 @@ func (o *ClientConnection) Read([]byte) (int, error) { panic("KCP|ClientConnection: Read should not be called.") } -func (o *ClientConnection) Id() internal.ConnectionID { - return o.id -} - func (o *ClientConnection) Close() error { return o.Conn.Close() } @@ -114,25 +107,18 @@ func DialKCP(ctx context.Context, dest v2net.Destination) (internet.Connection, log.Trace(errors.New("KCP|Dialer: Dialing KCP to ", dest)) src := internet.DialerSourceFromContext(ctx) - id := internal.NewConnectionID(src, dest) - conn := globalPool.Get(id) - if conn == nil { - rawConn, err := internet.DialSystem(ctx, src, dest) - if err != nil { - log.Trace(errors.New("KCP|Dialer: Failed to dial to dest: ", err).AtError()) - return nil, err - } - c := &ClientConnection{ - Conn: rawConn, - id: id, - } - go c.Run() - conn = c + rawConn, err := internet.DialSystem(ctx, src, dest) + if err != nil { + log.Trace(errors.New("KCP|Dialer: Failed to dial to dest: ", err).AtError()) + return nil, err } + conn := &ClientConnection{ + Conn: rawConn, + } + go conn.Run() kcpSettings := internet.TransportSettingsFromContext(ctx).(*Config) - clientConn := conn.(*ClientConnection) header, err := kcpSettings.GetPackerHeader() if err != nil { return nil, errors.New("KCP|Dialer: Failed to create packet header.").Base(err) @@ -141,9 +127,9 @@ func DialKCP(ctx context.Context, dest v2net.Destination) (internet.Connection, if err != nil { return nil, errors.New("KCP|Dialer: Failed to create security.").Base(err) } - clientConn.ResetSecurity(header, security) + conn.ResetSecurity(header, security) conv := uint16(atomic.AddUint32(&globalConv, 1)) - session := NewConnection(conv, clientConn, globalPool, kcpSettings) + session := NewConnection(conv, conn, kcpSettings) var iConn internet.Connection iConn = session @@ -156,7 +142,7 @@ func DialKCP(ctx context.Context, dest v2net.Destination) (internet.Connection, config.ServerName = dest.Address.Domain() } tlsConn := tls.Client(iConn, config) - iConn = UnreusableConnection{Conn: tlsConn} + iConn = tlsConn } } diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go index 339cb5227..37beb9115 100644 --- a/transport/internet/kcp/listener.go +++ b/transport/internet/kcp/listener.go @@ -15,7 +15,6 @@ import ( "v2ray.com/core/common/errors" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/internet/internal" v2tls "v2ray.com/core/transport/internet/tls" "v2ray.com/core/transport/internet/udp" ) @@ -27,7 +26,6 @@ type ConnectionID struct { } type ServerConnection struct { - id internal.ConnectionID local net.Addr remote net.Addr writer PacketWriter @@ -73,10 +71,6 @@ func (*ServerConnection) SetWriteDeadline(time.Time) error { return nil } -func (c *ServerConnection) Id() internal.ConnectionID { - return c.id -} - // Listener defines a server listening for connections type Listener struct { sync.Mutex @@ -94,7 +88,6 @@ type Listener struct { func NewListener(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- internet.Connection) (*Listener, error) { networkSettings := internet.TransportSettingsFromContext(ctx) kcpSettings := networkSettings.(*Config) - kcpSettings.ConnectionReuse = &ConnectionReuse{Enable: false} header, err := kcpSettings.GetPackerHeader() if err != nil { @@ -182,7 +175,6 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src v2net.Destination, origina } localAddr := v.hub.Addr() sConn := &ServerConnection{ - id: internal.NewConnectionID(v2net.LocalHostIP, src), local: localAddr, remote: remoteAddr, writer: &KCPPacketWriter{ @@ -192,17 +184,16 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src v2net.Destination, origina }, closer: writer, } - conn = NewConnection(conv, sConn, v, v.config) + conn = NewConnection(conv, sConn, v.config) var netConn internet.Connection = conn if v.tlsConfig != nil { tlsConn := tls.Server(conn, v.tlsConfig) - netConn = UnreusableConnection{Conn: tlsConn} + netConn = tlsConn } select { case v.conns <- netConn: case <-time.After(time.Second * 5): - conn.SetReusable(false) conn.Close() return } @@ -248,8 +239,6 @@ func (v *Listener) Addr() net.Addr { return v.hub.Addr() } -func (v *Listener) Put(internal.ConnectionID, net.Conn) {} - type Writer struct { id ConnectionID dest v2net.Destination diff --git a/transport/internet/kcp/tls.go b/transport/internet/kcp/tls.go deleted file mode 100644 index 3d036dd6a..000000000 --- a/transport/internet/kcp/tls.go +++ /dev/null @@ -1,13 +0,0 @@ -package kcp - -import "net" - -type UnreusableConnection struct { - net.Conn -} - -func (c UnreusableConnection) Reusable() bool { - return false -} - -func (c UnreusableConnection) SetReusable(bool) {} diff --git a/transport/internet/tcp/config.go b/transport/internet/tcp/config.go index 021b974b3..cd8cc0f79 100644 --- a/transport/internet/tcp/config.go +++ b/transport/internet/tcp/config.go @@ -5,13 +5,6 @@ import ( "v2ray.com/core/transport/internet" ) -func (v *Config) IsConnectionReuse() bool { - if v == nil || v.ConnectionReuse == nil { - return true - } - return v.ConnectionReuse.Enable -} - func init() { common.Must(internet.RegisterProtocolConfigCreator(internet.TransportProtocol_TCP, func() interface{} { return new(Config) diff --git a/transport/internet/tcp/config.pb.go b/transport/internet/tcp/config.pb.go index ac022e965..08d1fc565 100644 --- a/transport/internet/tcp/config.pb.go +++ b/transport/internet/tcp/config.pb.go @@ -16,38 +16,14 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package -type ConnectionReuse struct { - Enable bool `protobuf:"varint,1,opt,name=enable" json:"enable,omitempty"` -} - -func (m *ConnectionReuse) Reset() { *m = ConnectionReuse{} } -func (m *ConnectionReuse) String() string { return proto.CompactTextString(m) } -func (*ConnectionReuse) ProtoMessage() {} -func (*ConnectionReuse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -func (m *ConnectionReuse) GetEnable() bool { - if m != nil { - return m.Enable - } - return false -} - type Config struct { - ConnectionReuse *ConnectionReuse `protobuf:"bytes,1,opt,name=connection_reuse,json=connectionReuse" json:"connection_reuse,omitempty"` - HeaderSettings *v2ray_core_common_serial.TypedMessage `protobuf:"bytes,2,opt,name=header_settings,json=headerSettings" json:"header_settings,omitempty"` + HeaderSettings *v2ray_core_common_serial.TypedMessage `protobuf:"bytes,2,opt,name=header_settings,json=headerSettings" json:"header_settings,omitempty"` } func (m *Config) Reset() { *m = Config{} } func (m *Config) String() string { return proto.CompactTextString(m) } func (*Config) ProtoMessage() {} -func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -func (m *Config) GetConnectionReuse() *ConnectionReuse { - if m != nil { - return m.ConnectionReuse - } - return nil -} +func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } func (m *Config) GetHeaderSettings() *v2ray_core_common_serial.TypedMessage { if m != nil { @@ -57,30 +33,25 @@ func (m *Config) GetHeaderSettings() *v2ray_core_common_serial.TypedMessage { } func init() { - proto.RegisterType((*ConnectionReuse)(nil), "v2ray.core.transport.internet.tcp.ConnectionReuse") proto.RegisterType((*Config)(nil), "v2ray.core.transport.internet.tcp.Config") } func init() { proto.RegisterFile("v2ray.com/core/transport/internet/tcp/config.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 273 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x90, 0xc1, 0x4a, 0xf4, 0x30, - 0x14, 0x85, 0xe9, 0xfc, 0x50, 0x7e, 0x22, 0x58, 0xe9, 0x42, 0x06, 0x57, 0xce, 0x80, 0xa2, 0x9b, - 0x44, 0xea, 0x1b, 0xd8, 0x95, 0x0b, 0x51, 0x62, 0x71, 0x21, 0x48, 0xc9, 0xdc, 0xb9, 0xd6, 0xc2, - 0x34, 0x37, 0x24, 0x57, 0xa1, 0xaf, 0xe4, 0x13, 0xf8, 0x78, 0xd2, 0x76, 0x5a, 0xa4, 0x9b, 0x59, - 0x06, 0xbe, 0xef, 0xe4, 0x9c, 0x2b, 0xb2, 0xaf, 0xcc, 0x9b, 0x56, 0x02, 0x35, 0x0a, 0xc8, 0xa3, - 0x62, 0x6f, 0x6c, 0x70, 0xe4, 0x59, 0xd5, 0x96, 0xd1, 0x5b, 0x64, 0xc5, 0xe0, 0x14, 0x90, 0x7d, - 0xaf, 0x2b, 0xe9, 0x3c, 0x31, 0xa5, 0xab, 0xd1, 0xf1, 0x28, 0x27, 0x5e, 0x8e, 0xbc, 0x64, 0x70, - 0x67, 0x37, 0xb3, 0x58, 0xa0, 0xa6, 0x21, 0xab, 0x02, 0xfa, 0xda, 0xec, 0x14, 0xb7, 0x0e, 0xb7, - 0x65, 0x83, 0x21, 0x98, 0x0a, 0x87, 0xd0, 0xf5, 0xb5, 0x48, 0x72, 0xb2, 0x16, 0x81, 0x6b, 0xb2, - 0x1a, 0x3f, 0x03, 0xa6, 0xa7, 0x22, 0x46, 0x6b, 0x36, 0x3b, 0x5c, 0x46, 0xe7, 0xd1, 0xd5, 0x7f, - 0xbd, 0x7f, 0xad, 0x7f, 0x22, 0x11, 0xe7, 0x7d, 0xa1, 0xf4, 0x4d, 0x9c, 0xc0, 0x64, 0x95, 0xbe, - 0xd3, 0x7a, 0xf8, 0x28, 0xcb, 0xe4, 0xc1, 0x96, 0x72, 0xf6, 0xa1, 0x4e, 0x60, 0xd6, 0xe0, 0x51, - 0x24, 0x1f, 0x68, 0xb6, 0xe8, 0xcb, 0x80, 0xcc, 0xb5, 0xad, 0xc2, 0x72, 0xd1, 0xa7, 0x5f, 0xfe, - 0x4d, 0x1f, 0xc6, 0xc9, 0x61, 0x9c, 0x2c, 0xba, 0x71, 0x0f, 0xc3, 0x36, 0x7d, 0x3c, 0xe8, 0xcf, - 0x7b, 0xfb, 0x4e, 0x8b, 0x0b, 0xa0, 0xe6, 0x70, 0xb5, 0xa7, 0xe8, 0xf5, 0x1f, 0x83, 0xfb, 0x5e, - 0xac, 0x5e, 0x32, 0x6d, 0x5a, 0x99, 0x77, 0x68, 0x31, 0xa1, 0xf7, 0x23, 0x5a, 0x80, 0xdb, 0xc4, - 0xfd, 0x01, 0x6f, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x17, 0x2c, 0x8e, 0x55, 0xcb, 0x01, 0x00, - 0x00, + // 223 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8e, 0xc1, 0x4a, 0xc4, 0x30, + 0x10, 0x86, 0x69, 0x95, 0x45, 0x2a, 0xa8, 0xec, 0x49, 0x3c, 0xb9, 0x82, 0xe2, 0x69, 0x22, 0xf1, + 0x0d, 0xdc, 0x93, 0x82, 0x28, 0xb5, 0x78, 0xf0, 0x52, 0xe2, 0xec, 0x58, 0x03, 0x26, 0x13, 0x26, + 0x83, 0xd0, 0x57, 0xf2, 0x29, 0x65, 0x37, 0x76, 0x11, 0x2f, 0xde, 0xbf, 0xef, 0xfb, 0xff, 0xc6, + 0x7e, 0x5a, 0x71, 0x23, 0x20, 0x07, 0x83, 0x2c, 0x64, 0x54, 0x5c, 0xcc, 0x89, 0x45, 0x8d, 0x8f, + 0x4a, 0x12, 0x49, 0x8d, 0x62, 0x32, 0xc8, 0xf1, 0xcd, 0x0f, 0x90, 0x84, 0x95, 0xe7, 0x8b, 0xc9, + 0x11, 0x82, 0x2d, 0x0f, 0x13, 0x0f, 0x8a, 0xe9, 0xe4, 0xea, 0x4f, 0x16, 0x39, 0x04, 0x8e, 0x26, + 0x93, 0x78, 0xf7, 0x61, 0x74, 0x4c, 0xb4, 0xea, 0x03, 0xe5, 0xec, 0x06, 0x2a, 0xd1, 0xb3, 0xbe, + 0x99, 0x2d, 0x37, 0x23, 0xf3, 0x87, 0xe6, 0xf0, 0x9d, 0xdc, 0x8a, 0xa4, 0xcf, 0xa4, 0xea, 0xe3, + 0x90, 0x8f, 0xeb, 0xd3, 0xea, 0x72, 0xdf, 0x5e, 0xc0, 0xaf, 0xe1, 0x52, 0x84, 0x52, 0x84, 0x6e, + 0x5d, 0xbc, 0x2f, 0xc1, 0xf6, 0xa0, 0xe8, 0x4f, 0x3f, 0xf6, 0xdd, 0xee, 0x5e, 0x75, 0x54, 0xdf, + 0xb4, 0xcd, 0x39, 0x72, 0x80, 0x7f, 0xbf, 0x3f, 0x56, 0x2f, 0x3b, 0x8a, 0xe9, 0xab, 0x5e, 0x3c, + 0xdb, 0xd6, 0x8d, 0xb0, 0x5c, 0xa3, 0xdd, 0x16, 0xbd, 0x9d, 0xd0, 0x0e, 0xd3, 0xeb, 0x6c, 0xf3, + 0xfd, 0xfa, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x15, 0xf9, 0x1f, 0xa0, 0x46, 0x01, 0x00, 0x00, } diff --git a/transport/internet/tcp/config.proto b/transport/internet/tcp/config.proto index 37479f5ac..dadfb5938 100644 --- a/transport/internet/tcp/config.proto +++ b/transport/internet/tcp/config.proto @@ -8,10 +8,7 @@ option java_multiple_files = true; import "v2ray.com/core/common/serial/typed_message.proto"; -message ConnectionReuse { - bool enable = 1; -} message Config { - ConnectionReuse connection_reuse = 1; + reserved 1; v2ray.core.common.serial.TypedMessage header_settings = 2; } \ No newline at end of file diff --git a/transport/internet/tcp/dialer.go b/transport/internet/tcp/dialer.go index de9f823c8..b59afb3c6 100644 --- a/transport/internet/tcp/dialer.go +++ b/transport/internet/tcp/dialer.go @@ -3,61 +3,47 @@ package tcp import ( "context" "crypto/tls" - "net" "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/errors" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/internet/internal" v2tls "v2ray.com/core/transport/internet/tls" ) -var ( - globalCache = internal.NewConnectionPool() -) - func Dial(ctx context.Context, dest v2net.Destination) (internet.Connection, error) { log.Trace(errors.New("Internet|TCP: Dailing TCP to ", dest)) src := internet.DialerSourceFromContext(ctx) tcpSettings := internet.TransportSettingsFromContext(ctx).(*Config) - id := internal.NewConnectionID(src, dest) - var conn net.Conn - if dest.Network == v2net.Network_TCP && tcpSettings.IsConnectionReuse() { - conn = globalCache.Get(id) + conn, err := internet.DialSystem(ctx, src, dest) + if err != nil { + return nil, err } - if conn == nil { - var err error - conn, err = internet.DialSystem(ctx, src, dest) + if securitySettings := internet.SecuritySettingsFromContext(ctx); securitySettings != nil { + tlsConfig, ok := securitySettings.(*v2tls.Config) + if ok { + config := tlsConfig.GetTLSConfig() + if dest.Address.Family().IsDomain() { + config.ServerName = dest.Address.Domain() + } + conn = tls.Client(conn, config) + } + } + if tcpSettings.HeaderSettings != nil { + headerConfig, err := tcpSettings.HeaderSettings.GetInstance() if err != nil { - return nil, err + return nil, errors.New("Internet|TCP: Failed to get header settings.").Base(err) } - if securitySettings := internet.SecuritySettingsFromContext(ctx); securitySettings != nil { - tlsConfig, ok := securitySettings.(*v2tls.Config) - if ok { - config := tlsConfig.GetTLSConfig() - if dest.Address.Family().IsDomain() { - config.ServerName = dest.Address.Domain() - } - conn = tls.Client(conn, config) - } - } - if tcpSettings.HeaderSettings != nil { - headerConfig, err := tcpSettings.HeaderSettings.GetInstance() - if err != nil { - return nil, errors.New("Internet|TCP: Failed to get header settings.").Base(err) - } - auth, err := internet.CreateConnectionAuthenticator(headerConfig) - if err != nil { - return nil, errors.New("Internet|TCP: Failed to create header authenticator.").Base(err) - } - conn = auth.Client(conn) + auth, err := internet.CreateConnectionAuthenticator(headerConfig) + if err != nil { + return nil, errors.New("Internet|TCP: Failed to create header authenticator.").Base(err) } + conn = auth.Client(conn) } - return internal.NewConnection(id, conn, globalCache, internal.ReuseConnection(tcpSettings.IsConnectionReuse())), nil + return internet.Connection(conn), nil } func init() { diff --git a/transport/internet/tcp/hub.go b/transport/internet/tcp/hub.go index 43ec711bf..b21177478 100644 --- a/transport/internet/tcp/hub.go +++ b/transport/internet/tcp/hub.go @@ -12,7 +12,6 @@ import ( v2net "v2ray.com/core/common/net" "v2ray.com/core/common/retry" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/internet/internal" v2tls "v2ray.com/core/transport/internet/tls" ) @@ -97,23 +96,13 @@ func (v *TCPListener) KeepAccepting() { } select { - case v.conns <- internal.NewConnection(internal.ConnectionID{}, conn, v, internal.ReuseConnection(v.config.IsConnectionReuse())): + case v.conns <- internet.Connection(conn): case <-time.After(time.Second * 5): conn.Close() } } } -func (v *TCPListener) Put(id internal.ConnectionID, conn net.Conn) { - select { - case v.conns <- internal.NewConnection(internal.ConnectionID{}, conn, v, internal.ReuseConnection(v.config.IsConnectionReuse())): - case <-time.After(time.Second * 5): - conn.Close() - case <-v.ctx.Done(): - conn.Close() - } -} - func (v *TCPListener) Addr() net.Addr { return v.listener.Addr() } diff --git a/transport/internet/udp/dialer.go b/transport/internet/udp/dialer.go index e70220f36..18557c1ed 100644 --- a/transport/internet/udp/dialer.go +++ b/transport/internet/udp/dialer.go @@ -6,7 +6,6 @@ import ( "v2ray.com/core/common" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/internet/internal" ) func init() { @@ -18,6 +17,6 @@ func init() { return nil, err } // TODO: handle dialer options - return internal.NewConnection(internal.NewConnectionID(src, dest), conn, internal.NoOpConnectionRecyler{}, internal.ReuseConnection(false)), nil + return internet.Connection(conn), nil })) } diff --git a/transport/internet/websocket/config.go b/transport/internet/websocket/config.go index 93a8ad07a..4082f4fde 100644 --- a/transport/internet/websocket/config.go +++ b/transport/internet/websocket/config.go @@ -5,13 +5,6 @@ import ( "v2ray.com/core/transport/internet" ) -func (c *Config) IsConnectionReuse() bool { - if c == nil || c.ConnectionReuse == nil { - return true - } - return c.ConnectionReuse.Enable -} - func (c *Config) GetNormailzedPath() string { path := c.Path if len(path) == 0 { diff --git a/transport/internet/websocket/config.pb.go b/transport/internet/websocket/config.pb.go index 670af4046..c2a6abaf1 100644 --- a/transport/internet/websocket/config.pb.go +++ b/transport/internet/websocket/config.pb.go @@ -32,8 +32,6 @@ func (m *ConnectionReuse) GetEnable() bool { } type Config struct { - // Whether or not to reuse WebSocket connections. - ConnectionReuse *ConnectionReuse `protobuf:"bytes,1,opt,name=connection_reuse,json=connectionReuse" json:"connection_reuse,omitempty"` // URL path to the WebSocket service. Empty value means root(/). Path string `protobuf:"bytes,2,opt,name=path" json:"path,omitempty"` } @@ -43,13 +41,6 @@ func (m *Config) String() string { return proto.CompactTextString(m) func (*Config) ProtoMessage() {} func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } -func (m *Config) GetConnectionReuse() *ConnectionReuse { - if m != nil { - return m.ConnectionReuse - } - return nil -} - func (m *Config) GetPath() string { if m != nil { return m.Path @@ -67,20 +58,18 @@ func init() { } var fileDescriptor0 = []byte{ - // 226 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0xd0, 0xb1, 0x4a, 0x03, 0x41, - 0x10, 0x80, 0x61, 0x36, 0xc8, 0x61, 0xd6, 0x22, 0x72, 0x85, 0xa4, 0x0c, 0x69, 0x12, 0x11, 0x76, - 0xe1, 0x6c, 0x52, 0x7b, 0x95, 0x9d, 0x2c, 0xa2, 0x60, 0x23, 0x7b, 0xe3, 0xa8, 0x87, 0x66, 0xe6, - 0x98, 0x1b, 0x95, 0x94, 0xbe, 0x8e, 0x4f, 0x29, 0x39, 0xb2, 0x5b, 0xa4, 0xba, 0x6e, 0x07, 0xe6, - 0xe3, 0x5f, 0xc6, 0x6e, 0xbe, 0x2b, 0x89, 0x3b, 0x07, 0xbc, 0xf5, 0xc0, 0x82, 0x5e, 0x25, 0x52, - 0xdf, 0xb1, 0xa8, 0x6f, 0x49, 0x51, 0x08, 0xd5, 0xff, 0x60, 0xd3, 0x33, 0x7c, 0xa0, 0x7a, 0x60, - 0x7a, 0x6d, 0xdf, 0x5c, 0x27, 0xac, 0x5c, 0xae, 0x92, 0x14, 0x74, 0x59, 0xb9, 0xa4, 0x5c, 0x56, - 0xcb, 0x4b, 0x3b, 0xab, 0x99, 0x08, 0x41, 0x5b, 0xa6, 0x80, 0x5f, 0x3d, 0x96, 0x17, 0xb6, 0x40, - 0x8a, 0xcd, 0x27, 0xce, 0xcd, 0xc2, 0xac, 0x4f, 0xc3, 0x61, 0x5a, 0xfe, 0x1a, 0x5b, 0xd4, 0x43, - 0xa4, 0x04, 0x7b, 0x0e, 0x59, 0x3d, 0xcb, 0x9e, 0x0d, 0xcb, 0x67, 0xd5, 0xc6, 0x8d, 0x2c, 0xbb, - 0xa3, 0x6c, 0x98, 0xc1, 0xd1, 0x3f, 0x4a, 0x7b, 0xd2, 0x45, 0x7d, 0x9f, 0x4f, 0x16, 0x66, 0x3d, - 0x0d, 0xc3, 0xfb, 0xe6, 0xc5, 0x5e, 0x01, 0x6f, 0xc7, 0x36, 0xee, 0xcc, 0xd3, 0x34, 0x0f, 0x7f, - 0x93, 0xd5, 0x43, 0x15, 0xe2, 0xce, 0xd5, 0x7b, 0x76, 0x9f, 0xd9, 0x6d, 0x62, 0x8f, 0x69, 0xb3, - 0x29, 0x86, 0x23, 0x5e, 0xff, 0x07, 0x00, 0x00, 0xff, 0xff, 0x22, 0x65, 0x99, 0x41, 0x80, 0x01, - 0x00, 0x00, + // 204 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0xcf, 0x31, 0x4b, 0xc7, 0x30, + 0x10, 0x05, 0x70, 0x52, 0xfe, 0x94, 0x36, 0x8b, 0x92, 0x41, 0x3a, 0x96, 0x2e, 0xad, 0x08, 0x09, + 0xd4, 0xc5, 0xd9, 0x4e, 0x3a, 0x49, 0x10, 0x05, 0xb7, 0x34, 0x9e, 0x5a, 0xb4, 0x77, 0x25, 0x3d, + 0x95, 0x7e, 0x25, 0x3f, 0xa5, 0x58, 0x4c, 0xe6, 0xff, 0x76, 0x0f, 0xee, 0xc7, 0xe3, 0xc9, 0xab, + 0xaf, 0x3e, 0xb8, 0x4d, 0x7b, 0x9a, 0x8d, 0xa7, 0x00, 0x86, 0x83, 0xc3, 0x75, 0xa1, 0xc0, 0x66, + 0x42, 0x86, 0x80, 0xc0, 0xe6, 0x1b, 0xc6, 0x95, 0xfc, 0x3b, 0xb0, 0xf1, 0x84, 0x2f, 0xd3, 0xab, + 0x5e, 0x02, 0x31, 0xa9, 0x36, 0xca, 0x00, 0x3a, 0x29, 0x1d, 0x95, 0x4e, 0xaa, 0x39, 0x97, 0x27, + 0x03, 0x21, 0x82, 0xe7, 0x89, 0xd0, 0xc2, 0xe7, 0x0a, 0xea, 0x4c, 0xe6, 0x80, 0x6e, 0xfc, 0x80, + 0x4a, 0xd4, 0xa2, 0x2b, 0xec, 0x7f, 0x6a, 0x1a, 0x99, 0x0f, 0x7b, 0x87, 0x52, 0xf2, 0xb0, 0x38, + 0x7e, 0xab, 0xb2, 0x5a, 0x74, 0xa5, 0xdd, 0xef, 0xdb, 0x43, 0x21, 0x4e, 0xb3, 0xeb, 0x67, 0x79, + 0xe1, 0x69, 0xd6, 0x47, 0xb6, 0xdf, 0x89, 0xa7, 0x32, 0x85, 0x9f, 0xac, 0x7d, 0xe8, 0xad, 0xdb, + 0xf4, 0xf0, 0xc7, 0xee, 0x13, 0xbb, 0x89, 0xec, 0x31, 0x7e, 0x8e, 0xf9, 0x3e, 0xf2, 0xf2, 0x37, + 0x00, 0x00, 0xff, 0xff, 0x7a, 0xf3, 0x2b, 0x77, 0x20, 0x01, 0x00, 0x00, } diff --git a/transport/internet/websocket/config.proto b/transport/internet/websocket/config.proto index 3de9c63f5..ebf47dbf4 100644 --- a/transport/internet/websocket/config.proto +++ b/transport/internet/websocket/config.proto @@ -11,8 +11,7 @@ message ConnectionReuse { } message Config { - // Whether or not to reuse WebSocket connections. - ConnectionReuse connection_reuse = 1; + reserved 1; // URL path to the WebSocket service. Empty value means root(/). string path = 2; diff --git a/transport/internet/websocket/dialer.go b/transport/internet/websocket/dialer.go index db8617e10..15f7cd209 100644 --- a/transport/internet/websocket/dialer.go +++ b/transport/internet/websocket/dialer.go @@ -10,32 +10,17 @@ import ( "v2ray.com/core/common/errors" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/internet/internal" v2tls "v2ray.com/core/transport/internet/tls" ) -var ( - globalCache = internal.NewConnectionPool() -) - func Dial(ctx context.Context, dest v2net.Destination) (internet.Connection, error) { - log.Trace(errors.New("WebSocket|Dialer: Creating connection to ", dest)) - src := internet.DialerSourceFromContext(ctx) - wsSettings := internet.TransportSettingsFromContext(ctx).(*Config) + log.Trace(errors.New("creating connection to ", dest).Path("Transport", "Internet", "WebSocket")) - id := internal.NewConnectionID(src, dest) - var conn net.Conn - if dest.Network == v2net.Network_TCP && wsSettings.IsConnectionReuse() { - conn = globalCache.Get(id) + conn, err := dialWebsocket(ctx, dest) + if err != nil { + return nil, errors.New("dial failed").Path("WebSocket", "Dialer") } - if conn == nil { - var err error - conn, err = dialWebsocket(ctx, dest) - if err != nil { - return nil, errors.New("dial failed").Path("WebSocket", "Dialer") - } - } - return internal.NewConnection(id, conn, globalCache, internal.ReuseConnection(wsSettings.IsConnectionReuse())), nil + return internet.Connection(conn), nil } func init() { diff --git a/transport/internet/websocket/hub.go b/transport/internet/websocket/hub.go index 49b4d0052..c20e9c463 100644 --- a/transport/internet/websocket/hub.go +++ b/transport/internet/websocket/hub.go @@ -15,7 +15,6 @@ import ( "v2ray.com/core/common/errors" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/internet/internal" v2tls "v2ray.com/core/transport/internet/tls" ) @@ -42,7 +41,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req select { case <-h.ln.ctx.Done(): conn.Close() - case h.ln.conns <- internal.NewConnection(internal.ConnectionID{}, conn, h.ln, internal.ReuseConnection(h.ln.config.IsConnectionReuse())): + case h.ln.conns <- internet.Connection(conn): case <-time.After(time.Second * 5): conn.Close() } @@ -120,16 +119,6 @@ func converttovws(w http.ResponseWriter, r *http.Request) (*connection, error) { return &connection{wsc: conn}, nil } -func (ln *Listener) Put(id internal.ConnectionID, conn net.Conn) { - select { - case <-ln.ctx.Done(): - conn.Close() - case ln.conns <- internal.NewConnection(internal.ConnectionID{}, conn, ln, internal.ReuseConnection(ln.config.IsConnectionReuse())): - case <-time.After(time.Second * 5): - conn.Close() - } -} - func (ln *Listener) Addr() net.Addr { return ln.listener.Addr() } diff --git a/transport/internet/websocket/ws_test.go b/transport/internet/websocket/ws_test.go index 6a8f0689b..8e106ed31 100644 --- a/transport/internet/websocket/ws_test.go +++ b/transport/internet/websocket/ws_test.go @@ -30,7 +30,6 @@ func Test_listenWSAndDial(t *testing.T) { n, err := c.Read(b[:]) //assert.Error(err).IsNil() if err != nil { - c.SetReusable(false) return } assert.Bool(bytes.HasPrefix(b[:n], []byte("Test connection"))).IsTrue() @@ -87,9 +86,6 @@ func Test_listenWSAndDial_TLS(t *testing.T) { ctx := internet.ContextWithTransportSettings(context.Background(), &Config{ Path: "wss", - ConnectionReuse: &ConnectionReuse{ - Enable: true, - }, }) ctx = internet.ContextWithSecuritySettings(ctx, &v2tls.Config{ AllowInsecure: true,