1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-17 14:57:44 -05:00

completely remove connection reuse feature

This commit is contained in:
Darien Raymond 2017-04-07 21:54:40 +02:00
parent cc8935ee02
commit ade8453c0a
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
40 changed files with 139 additions and 920 deletions

View File

@ -88,7 +88,6 @@ func (w *tcpWorker) handleConnections(conns <-chan internet.Connection) {
for {
select {
case conn := <-conns:
conn.SetReusable(false)
conn.Close()
default:
break L
@ -166,12 +165,6 @@ func (*udpConn) SetWriteDeadline(time.Time) error {
return nil
}
func (*udpConn) Reusable() bool {
return false
}
func (*udpConn) SetReusable(bool) {}
type udpWorker struct {
sync.RWMutex

View File

@ -191,11 +191,3 @@ func (v *Connection) SetReadDeadline(t time.Time) error {
func (v *Connection) SetWriteDeadline(t time.Time) error {
return nil
}
func (v *Connection) Reusable() bool {
return false
}
func (v *Connection) SetReusable(bool) {
}

View File

@ -16,8 +16,8 @@ func TestRequestOptionSet(t *testing.T) {
option.Set(RequestOptionChunkStream)
assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue()
option.Set(RequestOptionConnectionReuse)
assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue()
option.Set(RequestOptionChunkMasking)
assert.Bool(option.Has(RequestOptionChunkMasking)).IsTrue()
assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue()
}
@ -26,9 +26,9 @@ func TestRequestOptionClear(t *testing.T) {
var option RequestOption
option.Set(RequestOptionChunkStream)
option.Set(RequestOptionConnectionReuse)
option.Set(RequestOptionChunkMasking)
option.Clear(RequestOptionChunkStream)
assert.Bool(option.Has(RequestOptionChunkStream)).IsFalse()
assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue()
assert.Bool(option.Has(RequestOptionChunkMasking)).IsTrue()
}

View File

@ -45,7 +45,6 @@ func (d *DokodemoDoor) Network() net.NetworkList {
func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
log.Trace(errors.New("Dokodemo: processing connection from: ", conn.RemoteAddr()).AtDebug())
conn.SetReusable(false)
dest := net.Destination{
Network: network,
Address: d.address,

View File

@ -105,8 +105,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
}
defer conn.Close()
conn.SetReusable(false)
timeout := time.Second * time.Duration(v.timeout)
if timeout == 0 {
timeout = time.Minute * 5

View File

@ -69,8 +69,6 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
}
func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
conn.SetReusable(false)
conn.SetReadDeadline(time.Now().Add(time.Second * 8))
reader := bufio.NewReaderSize(conn, 2048)

View File

@ -65,7 +65,6 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
log.Trace(errors.New("tunneling request to ", destination, " via ", server.Destination()).Path("Proxy", "Shadowsocks", "Client"))
defer conn.Close()
conn.SetReusable(false)
request := &protocol.RequestHeader{
Version: Version,

View File

@ -60,8 +60,6 @@ func (s *Server) Network() net.NetworkList {
}
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
conn.SetReusable(false)
switch network {
case net.Network_TCP:
return s.handleConnection(ctx, conn, dispatcher)
@ -183,7 +181,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
responseDone := signal.ExecuteAsync(func() error {
defer ray.InboundInput().Close()
if err := buf.PipeUntilEOF(timer, bodyReader, ray.InboundInput()); err != nil {
mergeReader := buf.NewMergingReader(bodyReader)
if err := buf.PipeUntilEOF(timer, mergeReader, ray.InboundInput()); err != nil {
return errors.New("failed to transport all TCP request").Base(err).Path("Shadowsocks", "Server")
}
return nil

View File

@ -59,7 +59,6 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
}
defer conn.Close()
conn.SetReusable(false)
request := &protocol.RequestHeader{
Version: socks5Version,

View File

@ -48,8 +48,6 @@ func (s *Server) Network() net.NetworkList {
}
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
conn.SetReusable(false)
switch network {
case net.Network_TCP:
return s.processTCP(ctx, conn, dispatcher)

View File

@ -31,7 +31,6 @@ func TestRequestSerialization(t *testing.T) {
Version: 1,
User: user,
Command: protocol.RequestCommandTCP,
Option: protocol.RequestOptionConnectionReuse,
Address: v2net.DomainAddress("www.v2ray.com"),
Port: v2net.Port(443),
Security: protocol.Security(protocol.SecurityType_AES128_GCM),

View File

@ -138,8 +138,9 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio
bodyWriter := session.EncodeResponseBody(request, output)
mergeReader := buf.NewMergingReader(input)
// Optimize for small response packet
data, err := input.Read()
data, err := mergeReader.Read()
if err != nil {
return err
}
@ -155,7 +156,7 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio
}
}
if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil {
if err := buf.PipeUntilEOF(timer, mergeReader, bodyWriter); err != nil {
return err
}
@ -181,7 +182,6 @@ func (v *Handler) Process(ctx context.Context, network net.Network, connection i
log.Access(connection.RemoteAddr(), "", log.AccessRejected, err)
log.Trace(errors.New("VMess|Inbound: Invalid request from ", connection.RemoteAddr(), ": ", err))
}
connection.SetReusable(false)
return err
}
log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
@ -189,7 +189,6 @@ func (v *Handler) Process(ctx context.Context, network net.Network, connection i
connection.SetReadDeadline(time.Time{})
connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
userSettings := request.User.GetSettings()
ctx = protocol.ContextWithUser(ctx, request.User)
@ -214,24 +213,18 @@ func (v *Handler) Process(ctx context.Context, network net.Network, connection i
Command: v.generateCommand(ctx, request),
}
if connection.Reusable() {
response.Option.Set(protocol.ResponseOptionConnectionReuse)
}
responseDone := signal.ExecuteAsync(func() error {
return transferResponse(timer, session, request, response, output, writer)
})
if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
connection.SetReusable(false)
input.CloseError()
output.CloseError()
return errors.New("error during processing").Base(err).Path("VMess", "Inbound")
return errors.New("error during processing").Base(err).Path("Proxy", "VMess", "Inbound")
}
if err := writer.Flush(); err != nil {
connection.SetReusable(false)
return errors.New("error during flushing remaining data").Base(err).Path("VMess", "Inbound")
return errors.New("error during flushing remaining data").Base(err).Path("Proxy", "VMess", "Inbound")
}
runtime.KeepAlive(timer)

View File

@ -95,11 +95,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
request.Option.Set(protocol.RequestOptionChunkMasking)
}
conn.SetReusable(true)
if conn.Reusable() { // Conn reuse may be disabled on transportation layer
request.Option.Set(protocol.RequestOptionConnectionReuse)
}
input := outboundRay.OutboundInput()
output := outboundRay.OutboundOutput()
@ -154,8 +149,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
}
v.handleCommand(rec.Destination(), header.Command)
conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
reader.SetBuffered(false)
bodyReader := session.DecodeResponseBody(request, reader)
if err := buf.PipeUntilEOF(timer, bodyReader, output); err != nil {
@ -166,7 +159,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
})
if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
conn.SetReusable(false)
return errors.New("connection ends").Base(err).Path("VMess", "Outbound")
}
runtime.KeepAlive(timer)

View File

@ -341,11 +341,7 @@ func TestTLSOverWebSocket(t *testing.T) {
TransportSettings: []*internet.TransportConfig{
{
Protocol: internet.TransportProtocol_WebSocket,
Settings: serial.ToTypedMessage(&websocket.Config{
ConnectionReuse: &websocket.ConnectionReuse{
Enable: false,
},
}),
Settings: serial.ToTypedMessage(&websocket.Config{}),
},
},
SecurityType: serial.GetMessageType(&tls.Config{}),
@ -381,136 +377,3 @@ func TestTLSOverWebSocket(t *testing.T) {
CloseAllServers()
}
func TestTLSConnectionReuse(t *testing.T) {
assert := assert.On(t)
tcpServer := tcp.Server{
MsgProcessor: xor,
}
dest, err := tcpServer.Start()
assert.Error(err).IsNil()
defer tcpServer.Close()
userID := protocol.NewID(uuid.New())
serverPort := pickPort()
serverConfig := &core.Config{
Inbound: []*proxyman.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortRange: v2net.SinglePortRange(serverPort),
Listen: v2net.NewIPOrDomain(v2net.LocalHostIP),
StreamSettings: &internet.StreamConfig{
SecurityType: serial.GetMessageType(&tls.Config{}),
SecuritySettings: []*serial.TypedMessage{
serial.ToTypedMessage(&tls.Config{
Certificate: []*tls.Certificate{tlsgen.GenerateCertificateForTest()},
}),
},
},
}),
ProxySettings: serial.ToTypedMessage(&inbound.Config{
User: []*protocol.User{
{
Account: serial.ToTypedMessage(&vmess.Account{
Id: userID.String(),
}),
},
},
}),
},
},
Outbound: []*proxyman.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}
clientPort := pickPort()
clientConfig := &core.Config{
Inbound: []*proxyman.InboundHandlerConfig{
{
ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
PortRange: v2net.SinglePortRange(clientPort),
Listen: v2net.NewIPOrDomain(v2net.LocalHostIP),
}),
ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
Address: v2net.NewIPOrDomain(dest.Address),
Port: uint32(dest.Port),
NetworkList: &v2net.NetworkList{
Network: []v2net.Network{v2net.Network_TCP},
},
}),
},
},
Outbound: []*proxyman.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&outbound.Config{
Receiver: []*protocol.ServerEndpoint{
{
Address: v2net.NewIPOrDomain(v2net.LocalHostIP),
Port: uint32(serverPort),
User: []*protocol.User{
{
Account: serial.ToTypedMessage(&vmess.Account{
Id: userID.String(),
}),
},
},
},
},
}),
SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{
StreamSettings: &internet.StreamConfig{
SecurityType: serial.GetMessageType(&tls.Config{}),
SecuritySettings: []*serial.TypedMessage{
serial.ToTypedMessage(&tls.Config{
AllowInsecure: true,
}),
},
},
}),
},
},
}
assert.Error(InitializeServerConfig(serverConfig)).IsNil()
assert.Error(InitializeServerConfig(clientConfig)).IsNil()
for i := 0; i < 5; i++ {
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: int(clientPort),
})
assert.Error(err).IsNil()
payload := "dokodemo request."
nBytes, err := conn.Write([]byte(payload))
assert.Error(err).IsNil()
assert.Int(nBytes).Equals(len(payload))
response := readFrom(conn, time.Second*2, len(payload))
assert.Bytes(response).Equals(xor([]byte(payload)))
assert.Error(conn.Close()).IsNil()
}
time.Sleep(time.Second * 10)
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: int(clientPort),
})
assert.Error(err).IsNil()
payload := "dokodemo request."
nBytes, err := conn.Write([]byte(payload))
assert.Error(err).IsNil()
assert.Int(nBytes).Equals(len(payload))
response := readFrom(conn, time.Second*2, len(payload))
assert.Bytes(response).Equals(xor([]byte(payload)))
assert.Error(conn.Close()).IsNil()
CloseAllServers()
}

View File

@ -97,17 +97,11 @@ func (v *KCPConfig) Build() (*serial.TypedMessage, error) {
}
type TCPConfig struct {
ConnectionReuse *bool `json:"connectionReuse"`
HeaderConfig json.RawMessage `json:"header"`
}
func (v *TCPConfig) Build() (*serial.TypedMessage, error) {
config := new(tcp.Config)
if v.ConnectionReuse != nil {
config.ConnectionReuse = &tcp.ConnectionReuse{
Enable: *v.ConnectionReuse,
}
}
if len(v.HeaderConfig) > 0 {
headerConfig, _, err := tcpHeaderLoader.Load(v.HeaderConfig)
if err != nil {
@ -124,7 +118,6 @@ func (v *TCPConfig) Build() (*serial.TypedMessage, error) {
}
type WebSocketConfig struct {
ConnectionReuse *bool `json:"connectionReuse"`
Path string `json:"Path"`
}
@ -132,11 +125,6 @@ func (v *WebSocketConfig) Build() (*serial.TypedMessage, error) {
config := &websocket.Config{
Path: v.Path,
}
if v.ConnectionReuse != nil {
config.ConnectionReuse = &websocket.ConnectionReuse{
Enable: *v.ConnectionReuse,
}
}
return serial.ToTypedMessage(config), nil
}

View File

@ -19,7 +19,6 @@ func TestTransportConfig(t *testing.T) {
rawJson := `{
"tcpSettings": {
"connectionReuse": true,
"header": {
"type": "http",
"request": {
@ -64,7 +63,6 @@ func TestTransportConfig(t *testing.T) {
case *tcp.Config:
settingsCount++
assert.Bool(settingsWithProtocol.Protocol == internet.TransportProtocol_TCP).IsTrue()
assert.Bool(settings.IsConnectionReuse()).IsTrue()
rawHeader, err := settings.HeaderSettings.GetInstance()
assert.Error(err).IsNil()
header := rawHeader.(*http.Config)

View File

@ -6,14 +6,8 @@ import (
type ConnectionHandler func(Connection)
type Reusable interface {
Reusable() bool
SetReusable(reuse bool)
}
type Connection interface {
net.Conn
Reusable
}
type SysFd interface {

View File

@ -1,181 +0,0 @@
package internal
import (
"io"
"net"
"sync"
"time"
v2net "v2ray.com/core/common/net"
)
// ConnectionID is the ID of a connection.
type ConnectionID struct {
Local v2net.Address
Remote v2net.Address
RemotePort v2net.Port
}
// NewConnectionID creates a new ConnectionId.
func NewConnectionID(source v2net.Address, dest v2net.Destination) ConnectionID {
return ConnectionID{
Local: source,
Remote: dest.Address,
RemotePort: dest.Port,
}
}
// Reuser determines whether a connection can be reused or not.
type Reuser struct {
// userEnabled indicates connection-reuse enabled by user.
userEnabled bool
// appEnabled indicates connection-reuse enabled by app.
appEnabled bool
}
// ReuseConnection returns a tracker for tracking connection reusability.
func ReuseConnection(reuse bool) *Reuser {
return &Reuser{
userEnabled: reuse,
appEnabled: reuse,
}
}
// Connection is an implementation of net.Conn with re-usability.
type Connection struct {
sync.RWMutex
id ConnectionID
conn net.Conn
listener ConnectionRecyler
reuser *Reuser
}
// NewConnection creates a new connection.
func NewConnection(id ConnectionID, conn net.Conn, manager ConnectionRecyler, reuser *Reuser) *Connection {
return &Connection{
id: id,
conn: conn,
listener: manager,
reuser: reuser,
}
}
// Read implements net.Conn.Read().
func (v *Connection) Read(b []byte) (int, error) {
conn := v.underlyingConn()
if conn == nil {
return 0, io.EOF
}
return conn.Read(b)
}
// Write implement net.Conn.Write().
func (v *Connection) Write(b []byte) (int, error) {
conn := v.underlyingConn()
if conn == nil {
return 0, io.ErrClosedPipe
}
return conn.Write(b)
}
// Close implements net.Conn.Close(). If the connection is reusable, the underlying connection will be recycled.
func (v *Connection) Close() error {
if v == nil {
return io.ErrClosedPipe
}
v.Lock()
defer v.Unlock()
if v.conn == nil {
return io.ErrClosedPipe
}
if v.Reusable() {
v.listener.Put(v.id, v.conn)
return nil
}
err := v.conn.Close()
v.conn = nil
return err
}
// LocalAddr implements net.Conn.LocalAddr().
func (v *Connection) LocalAddr() net.Addr {
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.LocalAddr()
}
// RemoteAddr implements net.Conn.RemoteAddr().
func (v *Connection) RemoteAddr() net.Addr {
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.RemoteAddr()
}
// SetDeadline implements net.Conn.SetDeadline().
func (v *Connection) SetDeadline(t time.Time) error {
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.SetDeadline(t)
}
// SetReadDeadline implements net.Conn.SetReadDeadline().
func (v *Connection) SetReadDeadline(t time.Time) error {
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.SetReadDeadline(t)
}
// SetWriteDeadline implements net.Conn.SetWriteDeadline().
func (v *Connection) SetWriteDeadline(t time.Time) error {
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.SetWriteDeadline(t)
}
// SetReusable implements internet.Reusable.SetReusable().
func (v *Connection) SetReusable(reusable bool) {
if v == nil {
return
}
v.reuser.appEnabled = reusable
}
// Reusable implements internet.Reusable.Reusable().
func (v *Connection) Reusable() bool {
if v == nil {
return false
}
return v.reuser.userEnabled && v.reuser.appEnabled
}
// SysFd implement internet.SysFd.SysFd().
func (v *Connection) SysFd() (int, error) {
conn := v.underlyingConn()
if conn == nil {
return 0, io.ErrClosedPipe
}
return GetSysFd(conn)
}
func (v *Connection) underlyingConn() net.Conn {
if v == nil {
return nil
}
v.RLock()
defer v.RUnlock()
return v.conn
}

View File

@ -1,134 +0,0 @@
package internal
import (
"net"
"sync"
"time"
"v2ray.com/core/common/signal"
)
// ConnectionRecyler is the interface for recycling connections.
type ConnectionRecyler interface {
// Put returns a connection back to a connection pool.
Put(ConnectionID, net.Conn)
}
type NoOpConnectionRecyler struct{}
func (NoOpConnectionRecyler) Put(ConnectionID, net.Conn) {}
// ExpiringConnection is a connection that will expire in certain time.
type ExpiringConnection struct {
conn net.Conn
expire time.Time
}
// Expired returns true if the connection has expired.
func (ec *ExpiringConnection) Expired() bool {
return ec.expire.Before(time.Now())
}
// Pool is a connection pool.
type Pool struct {
sync.RWMutex
connsByDest map[ConnectionID][]*ExpiringConnection
cleanupToken *signal.Semaphore
}
// NewConnectionPool creates a new Pool.
func NewConnectionPool() *Pool {
p := &Pool{
connsByDest: make(map[ConnectionID][]*ExpiringConnection),
cleanupToken: signal.NewSemaphore(1),
}
return p
}
// Get returns a connection with matching connection ID. Nil if not found.
func (p *Pool) Get(id ConnectionID) net.Conn {
p.Lock()
defer p.Unlock()
list, found := p.connsByDest[id]
if !found {
return nil
}
connIdx := -1
for idx, conn := range list {
if !conn.Expired() {
connIdx = idx
break
}
}
if connIdx == -1 {
return nil
}
listLen := len(list)
conn := list[connIdx]
if connIdx != listLen-1 {
list[connIdx] = list[listLen-1]
}
list = list[:listLen-1]
p.connsByDest[id] = list
return conn.conn
}
func (p *Pool) isEmpty() bool {
p.RLock()
defer p.RUnlock()
return len(p.connsByDest) == 0
}
func (p *Pool) cleanup() {
defer p.cleanupToken.Signal()
for !p.isEmpty() {
time.Sleep(time.Second * 5)
expiredConns := make([]net.Conn, 0, 16)
p.Lock()
for dest, list := range p.connsByDest {
validConns := make([]*ExpiringConnection, 0, len(list))
for _, conn := range list {
if conn.Expired() {
expiredConns = append(expiredConns, conn.conn)
} else {
validConns = append(validConns, conn)
}
}
if len(validConns) != len(list) {
p.connsByDest[dest] = validConns
}
}
p.Unlock()
for _, conn := range expiredConns {
conn.Close()
}
}
}
// Put implements ConnectionRecyler.Put().
func (p *Pool) Put(id ConnectionID, conn net.Conn) {
expiringConn := &ExpiringConnection{
conn: conn,
expire: time.Now().Add(time.Second * 4),
}
p.Lock()
defer p.Unlock()
list, found := p.connsByDest[id]
if !found {
list = []*ExpiringConnection{expiringConn}
} else {
list = append(list, expiringConn)
}
p.connsByDest[id] = list
select {
case <-p.cleanupToken.Wait():
go p.cleanup()
default:
}
}

View File

@ -1,73 +0,0 @@
package internal_test
import (
"net"
"testing"
"time"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/testing/assert"
. "v2ray.com/core/transport/internet/internal"
)
type TestConnection struct {
id string
closed bool
}
func (o *TestConnection) Read([]byte) (int, error) {
return 0, nil
}
func (o *TestConnection) Write([]byte) (int, error) {
return 0, nil
}
func (o *TestConnection) Close() error {
o.closed = true
return nil
}
func (o *TestConnection) LocalAddr() net.Addr {
return nil
}
func (o *TestConnection) RemoteAddr() net.Addr {
return nil
}
func (o *TestConnection) SetDeadline(t time.Time) error {
return nil
}
func (o *TestConnection) SetReadDeadline(t time.Time) error {
return nil
}
func (o *TestConnection) SetWriteDeadline(t time.Time) error {
return nil
}
func TestConnectionCache(t *testing.T) {
assert := assert.On(t)
pool := NewConnectionPool()
conn := pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
assert.Pointer(conn).IsNil()
pool.Put(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), &TestConnection{id: "test"})
conn = pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
assert.String(conn.(*TestConnection).id).Equals("test")
}
func TestConnectionRecycle(t *testing.T) {
assert := assert.On(t)
pool := NewConnectionPool()
c := &TestConnection{id: "test"}
pool.Put(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), c)
time.Sleep(6 * time.Second)
assert.Bool(c.closed).IsTrue()
conn := pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
assert.Pointer(conn).IsNil()
}

View File

@ -96,13 +96,6 @@ func (v *Config) GetReceivingBufferSize() uint32 {
return v.GetReadBufferSize() / v.GetMTUValue()
}
func (v *Config) IsConnectionReuse() bool {
if v == nil || v.ConnectionReuse == nil {
return true
}
return v.ConnectionReuse.Enable
}
func init() {
common.Must(internet.RegisterProtocolConfigCreator(internet.TransportProtocol_MKCP, func() interface{} {
return new(Config)

View File

@ -143,7 +143,6 @@ type Config struct {
WriteBuffer *WriteBuffer `protobuf:"bytes,6,opt,name=write_buffer,json=writeBuffer" json:"write_buffer,omitempty"`
ReadBuffer *ReadBuffer `protobuf:"bytes,7,opt,name=read_buffer,json=readBuffer" json:"read_buffer,omitempty"`
HeaderConfig *v2ray_core_common_serial.TypedMessage `protobuf:"bytes,8,opt,name=header_config,json=headerConfig" json:"header_config,omitempty"`
ConnectionReuse *ConnectionReuse `protobuf:"bytes,9,opt,name=connection_reuse,json=connectionReuse" json:"connection_reuse,omitempty"`
}
func (m *Config) Reset() { *m = Config{} }
@ -207,13 +206,6 @@ func (m *Config) GetHeaderConfig() *v2ray_core_common_serial.TypedMessage {
return nil
}
func (m *Config) GetConnectionReuse() *ConnectionReuse {
if m != nil {
return m.ConnectionReuse
}
return nil
}
func init() {
proto.RegisterType((*MTU)(nil), "v2ray.core.transport.internet.kcp.MTU")
proto.RegisterType((*TTI)(nil), "v2ray.core.transport.internet.kcp.TTI")
@ -228,36 +220,35 @@ func init() {
func init() { proto.RegisterFile("v2ray.com/core/transport/internet/kcp/config.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 487 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0x5f, 0x6f, 0xd3, 0x30,
0x14, 0xc5, 0xd5, 0x75, 0x2d, 0xe3, 0x76, 0x5b, 0x4b, 0x84, 0x50, 0x04, 0x12, 0x5a, 0x2b, 0x31,
0x8d, 0x07, 0x1c, 0xc8, 0x5e, 0x78, 0x5e, 0x79, 0xa9, 0xa6, 0x22, 0xb0, 0x52, 0x90, 0x26, 0xa1,
0xe0, 0x3a, 0xb7, 0x25, 0x6a, 0x63, 0x47, 0x8e, 0xb3, 0xaa, 0x7c, 0x23, 0xf8, 0x94, 0xc8, 0x4e,
0xd3, 0x7f, 0x68, 0x6b, 0xde, 0x6a, 0xdf, 0x73, 0x7f, 0xae, 0xce, 0x3d, 0x37, 0xe0, 0xdf, 0xfb,
0x8a, 0x2d, 0x09, 0x97, 0x89, 0xc7, 0xa5, 0x42, 0x4f, 0x2b, 0x26, 0xb2, 0x54, 0x2a, 0xed, 0xc5,
0x42, 0xa3, 0x12, 0xa8, 0xbd, 0x19, 0x4f, 0x3d, 0x2e, 0xc5, 0x24, 0x9e, 0x92, 0x54, 0x49, 0x2d,
0x9d, 0x6e, 0xd9, 0xa3, 0x90, 0xac, 0xf5, 0xa4, 0xd4, 0x93, 0x19, 0x4f, 0x5f, 0xbe, 0xdf, 0xc3,
0x72, 0x99, 0x24, 0x52, 0x78, 0x19, 0xaa, 0x98, 0xcd, 0x3d, 0xbd, 0x4c, 0x31, 0x0a, 0x13, 0xcc,
0x32, 0x36, 0xc5, 0x02, 0xda, 0x7b, 0x05, 0xf5, 0x61, 0x30, 0x72, 0x9e, 0x43, 0xe3, 0x9e, 0xcd,
0x73, 0x74, 0x6b, 0x17, 0xb5, 0xab, 0x33, 0x5a, 0x1c, 0x4c, 0x31, 0x08, 0x06, 0x0f, 0x14, 0x2f,
0xe1, 0x7c, 0x94, 0xce, 0x63, 0x31, 0xeb, 0xb3, 0x94, 0xf1, 0x58, 0x2f, 0x1f, 0xd0, 0x5d, 0x41,
0xe7, 0x93, 0x5c, 0x88, 0x0a, 0xca, 0x2e, 0xb4, 0xbe, 0xab, 0x58, 0xe3, 0x4d, 0x3e, 0x99, 0xa0,
0x72, 0x1c, 0x38, 0xce, 0xe2, 0xdf, 0xa5, 0xc6, 0xfe, 0xee, 0x5d, 0x00, 0x50, 0x64, 0xd1, 0x23,
0x8a, 0xb7, 0xd0, 0xee, 0x4b, 0x21, 0x90, 0xeb, 0x58, 0x0a, 0x8a, 0x79, 0x86, 0xce, 0x0b, 0x68,
0xa2, 0x60, 0xe3, 0x79, 0x21, 0x3c, 0xa1, 0xab, 0x53, 0xef, 0x4f, 0x03, 0x9a, 0x7d, 0xeb, 0xb0,
0xf3, 0x11, 0xea, 0x89, 0xce, 0x6d, 0xbd, 0xe5, 0x5f, 0x92, 0x83, 0x4e, 0x93, 0x61, 0x30, 0xa2,
0xa6, 0xc5, 0x74, 0x6a, 0x1d, 0xbb, 0x47, 0x95, 0x3b, 0x83, 0x60, 0x40, 0x4d, 0x8b, 0x73, 0x07,
0xed, 0xdc, 0x1a, 0x18, 0xf2, 0x95, 0x2f, 0x6e, 0xdd, 0x52, 0x3e, 0x54, 0xa0, 0xec, 0x5a, 0x4f,
0xcf, 0xf3, 0xdd, 0x51, 0xfc, 0x84, 0x67, 0xd1, 0xca, 0xf4, 0x0d, 0xfd, 0xd8, 0xd2, 0xaf, 0x2b,
0xd0, 0xf7, 0x07, 0x46, 0x3b, 0xd1, 0xfe, 0x08, 0x5f, 0x03, 0x70, 0x29, 0xa6, 0x98, 0x19, 0x9f,
0xdd, 0x86, 0x35, 0x76, 0xeb, 0xc6, 0xf9, 0x0a, 0xa7, 0x0b, 0x33, 0xcc, 0x70, 0x6c, 0x67, 0xe5,
0x36, 0xed, 0xe3, 0xa4, 0xc2, 0xe3, 0x5b, 0x19, 0xa0, 0xad, 0xc5, 0x56, 0x20, 0x3e, 0x43, 0x4b,
0x21, 0x8b, 0x4a, 0xe2, 0x13, 0x4b, 0x7c, 0x57, 0x81, 0xb8, 0x89, 0x0c, 0x05, 0xb5, 0x89, 0xcf,
0x2d, 0x9c, 0xfd, 0x42, 0x16, 0xa1, 0x0a, 0x8b, 0x3d, 0x73, 0x4f, 0xfe, 0x1f, 0x62, 0xb1, 0x41,
0xa4, 0xd8, 0x20, 0x12, 0x98, 0x0d, 0x1a, 0x16, 0x0b, 0x44, 0x4f, 0x8b, 0xe6, 0x55, 0x82, 0x7e,
0x40, 0x87, 0xaf, 0x73, 0x17, 0x2a, 0x13, 0x3c, 0xf7, 0xa9, 0xe5, 0xf9, 0x15, 0xfe, 0xe1, 0x5e,
0x64, 0x69, 0x9b, 0xef, 0x5e, 0xdc, 0x50, 0x78, 0xc3, 0x65, 0x72, 0x98, 0xf4, 0xa5, 0x76, 0x57,
0x9f, 0xf1, 0xf4, 0xef, 0x51, 0xf7, 0x9b, 0x4f, 0xd9, 0x92, 0xf4, 0x8d, 0x34, 0x58, 0x4b, 0x07,
0xa5, 0xf4, 0x96, 0xa7, 0xe3, 0xa6, 0xfd, 0x04, 0x5c, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x1d,
0x04, 0x18, 0x2b, 0x8d, 0x04, 0x00, 0x00,
// 471 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0x5f, 0x6f, 0xd3, 0x3e,
0x14, 0x55, 0xd7, 0xae, 0xbf, 0xfe, 0x6e, 0xf7, 0xa7, 0x44, 0x08, 0x45, 0x20, 0xa1, 0xb5, 0x12,
0xd3, 0x78, 0xc0, 0x81, 0xee, 0x85, 0xe7, 0x95, 0x97, 0x32, 0x15, 0x81, 0x95, 0x82, 0xb4, 0x97,
0xe2, 0x3a, 0xb7, 0xc5, 0x6a, 0x63, 0x5b, 0x8e, 0xb3, 0xaa, 0x7c, 0x24, 0x3e, 0x0d, 0x1f, 0x09,
0xc5, 0x6e, 0xd6, 0xae, 0x68, 0x2c, 0x6f, 0x71, 0xee, 0x39, 0xc7, 0xd6, 0x39, 0xf7, 0x40, 0xff,
0xb6, 0x6f, 0xd8, 0x9a, 0x70, 0x95, 0x46, 0x5c, 0x19, 0x8c, 0xac, 0x61, 0x32, 0xd3, 0xca, 0xd8,
0x48, 0x48, 0x8b, 0x46, 0xa2, 0x8d, 0x16, 0x5c, 0x47, 0x5c, 0xc9, 0x99, 0x98, 0x13, 0x6d, 0x94,
0x55, 0x41, 0xb7, 0xe4, 0x18, 0x24, 0x77, 0x78, 0x52, 0xe2, 0xc9, 0x82, 0xeb, 0xe7, 0x6f, 0xf7,
0x64, 0xb9, 0x4a, 0x53, 0x25, 0xa3, 0x0c, 0x8d, 0x60, 0xcb, 0xc8, 0xae, 0x35, 0x26, 0x93, 0x14,
0xb3, 0x8c, 0xcd, 0xd1, 0x8b, 0xf6, 0x5e, 0x40, 0x7d, 0x14, 0x8f, 0x83, 0xa7, 0x70, 0x78, 0xcb,
0x96, 0x39, 0x86, 0xb5, 0xb3, 0xda, 0xc5, 0x31, 0xf5, 0x87, 0x62, 0x18, 0xc7, 0xc3, 0x07, 0x86,
0xe7, 0x70, 0x32, 0xd6, 0x4b, 0x21, 0x17, 0x03, 0xa6, 0x19, 0x17, 0x76, 0xfd, 0x00, 0xee, 0x02,
0x3a, 0x1f, 0xd4, 0x4a, 0x56, 0x40, 0x76, 0xa1, 0xfd, 0xcd, 0x08, 0x8b, 0x57, 0xf9, 0x6c, 0x86,
0x26, 0x08, 0xa0, 0x91, 0x89, 0x9f, 0x25, 0xc6, 0x7d, 0xf7, 0xce, 0x00, 0x28, 0xb2, 0xe4, 0x1f,
0x88, 0xd7, 0x70, 0x3a, 0x50, 0x52, 0x22, 0xb7, 0x42, 0x49, 0x8a, 0x79, 0x86, 0xc1, 0x33, 0x68,
0xa2, 0x64, 0xd3, 0xa5, 0x07, 0xb6, 0xe8, 0xe6, 0xd4, 0xfb, 0xdd, 0x80, 0xe6, 0xc0, 0x39, 0x1c,
0xbc, 0x87, 0x7a, 0x6a, 0x73, 0x37, 0x6f, 0xf7, 0xcf, 0xc9, 0xa3, 0x4e, 0x93, 0x51, 0x3c, 0xa6,
0x05, 0xa5, 0x60, 0x5a, 0x2b, 0xc2, 0x83, 0xca, 0xcc, 0x38, 0x1e, 0xd2, 0x82, 0x12, 0xdc, 0xc0,
0x69, 0xee, 0x0c, 0x9c, 0xf0, 0x8d, 0x2f, 0x61, 0xdd, 0xa9, 0xbc, 0xab, 0xa0, 0x72, 0xdf, 0x7a,
0x7a, 0x92, 0xdf, 0x8f, 0xe2, 0x3b, 0x3c, 0x49, 0x36, 0xa6, 0x6f, 0xd5, 0x1b, 0x4e, 0xfd, 0xb2,
0x82, 0xfa, 0x7e, 0x60, 0xb4, 0x93, 0xec, 0x47, 0xf8, 0x12, 0x80, 0x2b, 0x39, 0xc7, 0xac, 0xf0,
0x39, 0x3c, 0x74, 0xc6, 0xee, 0xfc, 0x09, 0xbe, 0xc0, 0xd1, 0xaa, 0x08, 0x73, 0x32, 0x75, 0x59,
0x85, 0x4d, 0x77, 0x39, 0xa9, 0x70, 0xf9, 0xce, 0x0e, 0xd0, 0xf6, 0x6a, 0x67, 0x21, 0x3e, 0x41,
0xdb, 0x20, 0x4b, 0x4a, 0xc5, 0xff, 0x9c, 0xe2, 0x9b, 0x0a, 0x8a, 0xdb, 0x95, 0xa1, 0x60, 0xb6,
0xeb, 0x73, 0x0d, 0xc7, 0x3f, 0x90, 0x25, 0x68, 0x26, 0xbe, 0x67, 0x61, 0xeb, 0xef, 0x10, 0x7d,
0x83, 0x88, 0x6f, 0x10, 0x89, 0x8b, 0x06, 0x8d, 0x7c, 0x81, 0xe8, 0x91, 0x27, 0xfb, 0x0d, 0xfa,
0xd8, 0x68, 0xfd, 0xdf, 0x81, 0x2b, 0x0a, 0xaf, 0xb8, 0x4a, 0x1f, 0x7f, 0xd2, 0xe7, 0xda, 0x4d,
0x7d, 0xc1, 0xf5, 0xaf, 0x83, 0xee, 0xd7, 0x3e, 0x65, 0x6b, 0x32, 0x28, 0xa0, 0xf1, 0x1d, 0x74,
0x58, 0x42, 0xaf, 0xb9, 0x9e, 0x36, 0x5d, 0x53, 0x2f, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x54,
0xdd, 0xba, 0xf9, 0x34, 0x04, 0x00, 0x00,
}

View File

@ -51,5 +51,5 @@ message Config {
WriteBuffer write_buffer = 6;
ReadBuffer read_buffer = 7;
v2ray.core.common.serial.TypedMessage header_config = 8;
ConnectionReuse connection_reuse = 9;
reserved 9;
}

View File

@ -10,7 +10,6 @@ import (
"v2ray.com/core/app/log"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/predicate"
"v2ray.com/core/transport/internet/internal"
)
var (
@ -165,7 +164,6 @@ func (u *Updater) SetInterval(d time.Duration) {
type SystemConnection interface {
net.Conn
Id() internal.ConnectionID
Reset(func([]Segment))
Overhead() int
}
@ -173,7 +171,6 @@ type SystemConnection interface {
// Connection is a KCP connection over UDP.
type Connection struct {
conn SystemConnection
connRecycler internal.ConnectionRecyler
rd time.Time
wd time.Time // write deadline
since int64
@ -197,18 +194,15 @@ type Connection struct {
dataUpdater *Updater
pingUpdater *Updater
reusable bool
}
// NewConnection create a new KCP connection between local and remote.
func NewConnection(conv uint16, sysConn SystemConnection, recycler internal.ConnectionRecyler, config *Config) *Connection {
func NewConnection(conv uint16, sysConn SystemConnection, config *Config) *Connection {
log.Trace(errors.New("KCP|Connection: creating connection ", conv))
conn := &Connection{
conv: conv,
conn: sysConn,
connRecycler: recycler,
since: nowMillisec(),
dataInput: make(chan bool, 1),
dataOutput: make(chan bool, 1),
@ -443,14 +437,6 @@ func (v *Connection) updateTask() {
v.flush()
}
func (v *Connection) Reusable() bool {
return v.Config.IsConnectionReuse() && v.reusable
}
func (v *Connection) SetReusable(b bool) {
v.reusable = b
}
func (v *Connection) Terminate() {
if v == nil {
return
@ -461,11 +447,7 @@ func (v *Connection) Terminate() {
v.OnDataInput()
v.OnDataOutput()
if v.Config.IsConnectionReuse() && v.reusable {
v.connRecycler.Put(v.conn.Id(), v.conn)
} else {
v.conn.Close()
}
v.sendingWorker.Release()
v.receivingWorker.Release()
}

View File

@ -6,7 +6,6 @@ import (
"time"
"v2ray.com/core/testing/assert"
"v2ray.com/core/transport/internet/internal"
. "v2ray.com/core/transport/internet/kcp"
)
@ -48,20 +47,12 @@ func (o *NoOpConn) SetWriteDeadline(time.Time) error {
return nil
}
func (o *NoOpConn) Id() internal.ConnectionID {
return internal.ConnectionID{}
}
func (o *NoOpConn) Reset(input func([]Segment)) {}
type NoOpRecycler struct{}
func (o *NoOpRecycler) Put(internal.ConnectionID, net.Conn) {}
func TestConnectionReadTimeout(t *testing.T) {
assert := assert.On(t)
conn := NewConnection(1, &NoOpConn{}, &NoOpRecycler{}, &Config{})
conn := NewConnection(1, &NoOpConn{}, &Config{})
conn.SetReadDeadline(time.Now().Add(time.Second))
b := make([]byte, 1024)

View File

@ -15,19 +15,16 @@ import (
"v2ray.com/core/common/errors"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
)
var (
globalConv = uint32(dice.RandomUint16())
globalPool = internal.NewConnectionPool()
)
type ClientConnection struct {
sync.RWMutex
net.Conn
id internal.ConnectionID
input func([]Segment)
reader PacketReader
writer PacketWriter
@ -57,10 +54,6 @@ func (o *ClientConnection) Read([]byte) (int, error) {
panic("KCP|ClientConnection: Read should not be called.")
}
func (o *ClientConnection) Id() internal.ConnectionID {
return o.id
}
func (o *ClientConnection) Close() error {
return o.Conn.Close()
}
@ -114,25 +107,18 @@ func DialKCP(ctx context.Context, dest v2net.Destination) (internet.Connection,
log.Trace(errors.New("KCP|Dialer: Dialing KCP to ", dest))
src := internet.DialerSourceFromContext(ctx)
id := internal.NewConnectionID(src, dest)
conn := globalPool.Get(id)
if conn == nil {
rawConn, err := internet.DialSystem(ctx, src, dest)
if err != nil {
log.Trace(errors.New("KCP|Dialer: Failed to dial to dest: ", err).AtError())
return nil, err
}
c := &ClientConnection{
conn := &ClientConnection{
Conn: rawConn,
id: id,
}
go c.Run()
conn = c
}
go conn.Run()
kcpSettings := internet.TransportSettingsFromContext(ctx).(*Config)
clientConn := conn.(*ClientConnection)
header, err := kcpSettings.GetPackerHeader()
if err != nil {
return nil, errors.New("KCP|Dialer: Failed to create packet header.").Base(err)
@ -141,9 +127,9 @@ func DialKCP(ctx context.Context, dest v2net.Destination) (internet.Connection,
if err != nil {
return nil, errors.New("KCP|Dialer: Failed to create security.").Base(err)
}
clientConn.ResetSecurity(header, security)
conn.ResetSecurity(header, security)
conv := uint16(atomic.AddUint32(&globalConv, 1))
session := NewConnection(conv, clientConn, globalPool, kcpSettings)
session := NewConnection(conv, conn, kcpSettings)
var iConn internet.Connection
iConn = session
@ -156,7 +142,7 @@ func DialKCP(ctx context.Context, dest v2net.Destination) (internet.Connection,
config.ServerName = dest.Address.Domain()
}
tlsConn := tls.Client(iConn, config)
iConn = UnreusableConnection{Conn: tlsConn}
iConn = tlsConn
}
}

View File

@ -15,7 +15,6 @@ import (
"v2ray.com/core/common/errors"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
"v2ray.com/core/transport/internet/udp"
)
@ -27,7 +26,6 @@ type ConnectionID struct {
}
type ServerConnection struct {
id internal.ConnectionID
local net.Addr
remote net.Addr
writer PacketWriter
@ -73,10 +71,6 @@ func (*ServerConnection) SetWriteDeadline(time.Time) error {
return nil
}
func (c *ServerConnection) Id() internal.ConnectionID {
return c.id
}
// Listener defines a server listening for connections
type Listener struct {
sync.Mutex
@ -94,7 +88,6 @@ type Listener struct {
func NewListener(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- internet.Connection) (*Listener, error) {
networkSettings := internet.TransportSettingsFromContext(ctx)
kcpSettings := networkSettings.(*Config)
kcpSettings.ConnectionReuse = &ConnectionReuse{Enable: false}
header, err := kcpSettings.GetPackerHeader()
if err != nil {
@ -182,7 +175,6 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src v2net.Destination, origina
}
localAddr := v.hub.Addr()
sConn := &ServerConnection{
id: internal.NewConnectionID(v2net.LocalHostIP, src),
local: localAddr,
remote: remoteAddr,
writer: &KCPPacketWriter{
@ -192,17 +184,16 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src v2net.Destination, origina
},
closer: writer,
}
conn = NewConnection(conv, sConn, v, v.config)
conn = NewConnection(conv, sConn, v.config)
var netConn internet.Connection = conn
if v.tlsConfig != nil {
tlsConn := tls.Server(conn, v.tlsConfig)
netConn = UnreusableConnection{Conn: tlsConn}
netConn = tlsConn
}
select {
case v.conns <- netConn:
case <-time.After(time.Second * 5):
conn.SetReusable(false)
conn.Close()
return
}
@ -248,8 +239,6 @@ func (v *Listener) Addr() net.Addr {
return v.hub.Addr()
}
func (v *Listener) Put(internal.ConnectionID, net.Conn) {}
type Writer struct {
id ConnectionID
dest v2net.Destination

View File

@ -1,13 +0,0 @@
package kcp
import "net"
type UnreusableConnection struct {
net.Conn
}
func (c UnreusableConnection) Reusable() bool {
return false
}
func (c UnreusableConnection) SetReusable(bool) {}

View File

@ -5,13 +5,6 @@ import (
"v2ray.com/core/transport/internet"
)
func (v *Config) IsConnectionReuse() bool {
if v == nil || v.ConnectionReuse == nil {
return true
}
return v.ConnectionReuse.Enable
}
func init() {
common.Must(internet.RegisterProtocolConfigCreator(internet.TransportProtocol_TCP, func() interface{} {
return new(Config)

View File

@ -16,38 +16,14 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type ConnectionReuse struct {
Enable bool `protobuf:"varint,1,opt,name=enable" json:"enable,omitempty"`
}
func (m *ConnectionReuse) Reset() { *m = ConnectionReuse{} }
func (m *ConnectionReuse) String() string { return proto.CompactTextString(m) }
func (*ConnectionReuse) ProtoMessage() {}
func (*ConnectionReuse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *ConnectionReuse) GetEnable() bool {
if m != nil {
return m.Enable
}
return false
}
type Config struct {
ConnectionReuse *ConnectionReuse `protobuf:"bytes,1,opt,name=connection_reuse,json=connectionReuse" json:"connection_reuse,omitempty"`
HeaderSettings *v2ray_core_common_serial.TypedMessage `protobuf:"bytes,2,opt,name=header_settings,json=headerSettings" json:"header_settings,omitempty"`
}
func (m *Config) Reset() { *m = Config{} }
func (m *Config) String() string { return proto.CompactTextString(m) }
func (*Config) ProtoMessage() {}
func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Config) GetConnectionReuse() *ConnectionReuse {
if m != nil {
return m.ConnectionReuse
}
return nil
}
func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Config) GetHeaderSettings() *v2ray_core_common_serial.TypedMessage {
if m != nil {
@ -57,30 +33,25 @@ func (m *Config) GetHeaderSettings() *v2ray_core_common_serial.TypedMessage {
}
func init() {
proto.RegisterType((*ConnectionReuse)(nil), "v2ray.core.transport.internet.tcp.ConnectionReuse")
proto.RegisterType((*Config)(nil), "v2ray.core.transport.internet.tcp.Config")
}
func init() { proto.RegisterFile("v2ray.com/core/transport/internet/tcp/config.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 273 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x90, 0xc1, 0x4a, 0xf4, 0x30,
0x14, 0x85, 0xe9, 0xfc, 0x50, 0x7e, 0x22, 0x58, 0xe9, 0x42, 0x06, 0x57, 0xce, 0x80, 0xa2, 0x9b,
0x44, 0xea, 0x1b, 0xd8, 0x95, 0x0b, 0x51, 0x62, 0x71, 0x21, 0x48, 0xc9, 0xdc, 0xb9, 0xd6, 0xc2,
0x34, 0x37, 0x24, 0x57, 0xa1, 0xaf, 0xe4, 0x13, 0xf8, 0x78, 0xd2, 0x76, 0x5a, 0xa4, 0x9b, 0x59,
0x06, 0xbe, 0xef, 0xe4, 0x9c, 0x2b, 0xb2, 0xaf, 0xcc, 0x9b, 0x56, 0x02, 0x35, 0x0a, 0xc8, 0xa3,
0x62, 0x6f, 0x6c, 0x70, 0xe4, 0x59, 0xd5, 0x96, 0xd1, 0x5b, 0x64, 0xc5, 0xe0, 0x14, 0x90, 0x7d,
0xaf, 0x2b, 0xe9, 0x3c, 0x31, 0xa5, 0xab, 0xd1, 0xf1, 0x28, 0x27, 0x5e, 0x8e, 0xbc, 0x64, 0x70,
0x67, 0x37, 0xb3, 0x58, 0xa0, 0xa6, 0x21, 0xab, 0x02, 0xfa, 0xda, 0xec, 0x14, 0xb7, 0x0e, 0xb7,
0x65, 0x83, 0x21, 0x98, 0x0a, 0x87, 0xd0, 0xf5, 0xb5, 0x48, 0x72, 0xb2, 0x16, 0x81, 0x6b, 0xb2,
0x1a, 0x3f, 0x03, 0xa6, 0xa7, 0x22, 0x46, 0x6b, 0x36, 0x3b, 0x5c, 0x46, 0xe7, 0xd1, 0xd5, 0x7f,
0xbd, 0x7f, 0xad, 0x7f, 0x22, 0x11, 0xe7, 0x7d, 0xa1, 0xf4, 0x4d, 0x9c, 0xc0, 0x64, 0x95, 0xbe,
0xd3, 0x7a, 0xf8, 0x28, 0xcb, 0xe4, 0xc1, 0x96, 0x72, 0xf6, 0xa1, 0x4e, 0x60, 0xd6, 0xe0, 0x51,
0x24, 0x1f, 0x68, 0xb6, 0xe8, 0xcb, 0x80, 0xcc, 0xb5, 0xad, 0xc2, 0x72, 0xd1, 0xa7, 0x5f, 0xfe,
0x4d, 0x1f, 0xc6, 0xc9, 0x61, 0x9c, 0x2c, 0xba, 0x71, 0x0f, 0xc3, 0x36, 0x7d, 0x3c, 0xe8, 0xcf,
0x7b, 0xfb, 0x4e, 0x8b, 0x0b, 0xa0, 0xe6, 0x70, 0xb5, 0xa7, 0xe8, 0xf5, 0x1f, 0x83, 0xfb, 0x5e,
0xac, 0x5e, 0x32, 0x6d, 0x5a, 0x99, 0x77, 0x68, 0x31, 0xa1, 0xf7, 0x23, 0x5a, 0x80, 0xdb, 0xc4,
0xfd, 0x01, 0x6f, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x17, 0x2c, 0x8e, 0x55, 0xcb, 0x01, 0x00,
0x00,
// 223 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8e, 0xc1, 0x4a, 0xc4, 0x30,
0x10, 0x86, 0x69, 0x95, 0x45, 0x2a, 0xa8, 0xec, 0x49, 0x3c, 0xb9, 0x82, 0xe2, 0x69, 0x22, 0xf1,
0x0d, 0xdc, 0x93, 0x82, 0x28, 0xb5, 0x78, 0xf0, 0x52, 0xe2, 0xec, 0x58, 0x03, 0x26, 0x13, 0x26,
0x83, 0xd0, 0x57, 0xf2, 0x29, 0x65, 0x37, 0x76, 0x11, 0x2f, 0xde, 0xbf, 0xef, 0xfb, 0xff, 0xc6,
0x7e, 0x5a, 0x71, 0x23, 0x20, 0x07, 0x83, 0x2c, 0x64, 0x54, 0x5c, 0xcc, 0x89, 0x45, 0x8d, 0x8f,
0x4a, 0x12, 0x49, 0x8d, 0x62, 0x32, 0xc8, 0xf1, 0xcd, 0x0f, 0x90, 0x84, 0x95, 0xe7, 0x8b, 0xc9,
0x11, 0x82, 0x2d, 0x0f, 0x13, 0x0f, 0x8a, 0xe9, 0xe4, 0xea, 0x4f, 0x16, 0x39, 0x04, 0x8e, 0x26,
0x93, 0x78, 0xf7, 0x61, 0x74, 0x4c, 0xb4, 0xea, 0x03, 0xe5, 0xec, 0x06, 0x2a, 0xd1, 0xb3, 0xbe,
0x99, 0x2d, 0x37, 0x23, 0xf3, 0x87, 0xe6, 0xf0, 0x9d, 0xdc, 0x8a, 0xa4, 0xcf, 0xa4, 0xea, 0xe3,
0x90, 0x8f, 0xeb, 0xd3, 0xea, 0x72, 0xdf, 0x5e, 0xc0, 0xaf, 0xe1, 0x52, 0x84, 0x52, 0x84, 0x6e,
0x5d, 0xbc, 0x2f, 0xc1, 0xf6, 0xa0, 0xe8, 0x4f, 0x3f, 0xf6, 0xdd, 0xee, 0x5e, 0x75, 0x54, 0xdf,
0xb4, 0xcd, 0x39, 0x72, 0x80, 0x7f, 0xbf, 0x3f, 0x56, 0x2f, 0x3b, 0x8a, 0xe9, 0xab, 0x5e, 0x3c,
0xdb, 0xd6, 0x8d, 0xb0, 0x5c, 0xa3, 0xdd, 0x16, 0xbd, 0x9d, 0xd0, 0x0e, 0xd3, 0xeb, 0x6c, 0xf3,
0xfd, 0xfa, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x15, 0xf9, 0x1f, 0xa0, 0x46, 0x01, 0x00, 0x00,
}

View File

@ -8,10 +8,7 @@ option java_multiple_files = true;
import "v2ray.com/core/common/serial/typed_message.proto";
message ConnectionReuse {
bool enable = 1;
}
message Config {
ConnectionReuse connection_reuse = 1;
reserved 1;
v2ray.core.common.serial.TypedMessage header_settings = 2;
}

View File

@ -3,35 +3,22 @@ package tcp
import (
"context"
"crypto/tls"
"net"
"v2ray.com/core/app/log"
"v2ray.com/core/common"
"v2ray.com/core/common/errors"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
)
var (
globalCache = internal.NewConnectionPool()
)
func Dial(ctx context.Context, dest v2net.Destination) (internet.Connection, error) {
log.Trace(errors.New("Internet|TCP: Dailing TCP to ", dest))
src := internet.DialerSourceFromContext(ctx)
tcpSettings := internet.TransportSettingsFromContext(ctx).(*Config)
id := internal.NewConnectionID(src, dest)
var conn net.Conn
if dest.Network == v2net.Network_TCP && tcpSettings.IsConnectionReuse() {
conn = globalCache.Get(id)
}
if conn == nil {
var err error
conn, err = internet.DialSystem(ctx, src, dest)
conn, err := internet.DialSystem(ctx, src, dest)
if err != nil {
return nil, err
}
@ -56,8 +43,7 @@ func Dial(ctx context.Context, dest v2net.Destination) (internet.Connection, err
}
conn = auth.Client(conn)
}
}
return internal.NewConnection(id, conn, globalCache, internal.ReuseConnection(tcpSettings.IsConnectionReuse())), nil
return internet.Connection(conn), nil
}
func init() {

View File

@ -12,7 +12,6 @@ import (
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/retry"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
)
@ -97,23 +96,13 @@ func (v *TCPListener) KeepAccepting() {
}
select {
case v.conns <- internal.NewConnection(internal.ConnectionID{}, conn, v, internal.ReuseConnection(v.config.IsConnectionReuse())):
case v.conns <- internet.Connection(conn):
case <-time.After(time.Second * 5):
conn.Close()
}
}
}
func (v *TCPListener) Put(id internal.ConnectionID, conn net.Conn) {
select {
case v.conns <- internal.NewConnection(internal.ConnectionID{}, conn, v, internal.ReuseConnection(v.config.IsConnectionReuse())):
case <-time.After(time.Second * 5):
conn.Close()
case <-v.ctx.Done():
conn.Close()
}
}
func (v *TCPListener) Addr() net.Addr {
return v.listener.Addr()
}

View File

@ -6,7 +6,6 @@ import (
"v2ray.com/core/common"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
)
func init() {
@ -18,6 +17,6 @@ func init() {
return nil, err
}
// TODO: handle dialer options
return internal.NewConnection(internal.NewConnectionID(src, dest), conn, internal.NoOpConnectionRecyler{}, internal.ReuseConnection(false)), nil
return internet.Connection(conn), nil
}))
}

View File

@ -5,13 +5,6 @@ import (
"v2ray.com/core/transport/internet"
)
func (c *Config) IsConnectionReuse() bool {
if c == nil || c.ConnectionReuse == nil {
return true
}
return c.ConnectionReuse.Enable
}
func (c *Config) GetNormailzedPath() string {
path := c.Path
if len(path) == 0 {

View File

@ -32,8 +32,6 @@ func (m *ConnectionReuse) GetEnable() bool {
}
type Config struct {
// Whether or not to reuse WebSocket connections.
ConnectionReuse *ConnectionReuse `protobuf:"bytes,1,opt,name=connection_reuse,json=connectionReuse" json:"connection_reuse,omitempty"`
// URL path to the WebSocket service. Empty value means root(/).
Path string `protobuf:"bytes,2,opt,name=path" json:"path,omitempty"`
}
@ -43,13 +41,6 @@ func (m *Config) String() string { return proto.CompactTextString(m)
func (*Config) ProtoMessage() {}
func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Config) GetConnectionReuse() *ConnectionReuse {
if m != nil {
return m.ConnectionReuse
}
return nil
}
func (m *Config) GetPath() string {
if m != nil {
return m.Path
@ -67,20 +58,18 @@ func init() {
}
var fileDescriptor0 = []byte{
// 226 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0xd0, 0xb1, 0x4a, 0x03, 0x41,
0x10, 0x80, 0x61, 0x36, 0xc8, 0x61, 0xd6, 0x22, 0x72, 0x85, 0xa4, 0x0c, 0x69, 0x12, 0x11, 0x76,
0xe1, 0x6c, 0x52, 0x7b, 0x95, 0x9d, 0x2c, 0xa2, 0x60, 0x23, 0x7b, 0xe3, 0xa8, 0x87, 0x66, 0xe6,
0x98, 0x1b, 0x95, 0x94, 0xbe, 0x8e, 0x4f, 0x29, 0x39, 0xb2, 0x5b, 0xa4, 0xba, 0x6e, 0x07, 0xe6,
0xe3, 0x5f, 0xc6, 0x6e, 0xbe, 0x2b, 0x89, 0x3b, 0x07, 0xbc, 0xf5, 0xc0, 0x82, 0x5e, 0x25, 0x52,
0xdf, 0xb1, 0xa8, 0x6f, 0x49, 0x51, 0x08, 0xd5, 0xff, 0x60, 0xd3, 0x33, 0x7c, 0xa0, 0x7a, 0x60,
0x7a, 0x6d, 0xdf, 0x5c, 0x27, 0xac, 0x5c, 0xae, 0x92, 0x14, 0x74, 0x59, 0xb9, 0xa4, 0x5c, 0x56,
0xcb, 0x4b, 0x3b, 0xab, 0x99, 0x08, 0x41, 0x5b, 0xa6, 0x80, 0x5f, 0x3d, 0x96, 0x17, 0xb6, 0x40,
0x8a, 0xcd, 0x27, 0xce, 0xcd, 0xc2, 0xac, 0x4f, 0xc3, 0x61, 0x5a, 0xfe, 0x1a, 0x5b, 0xd4, 0x43,
0xa4, 0x04, 0x7b, 0x0e, 0x59, 0x3d, 0xcb, 0x9e, 0x0d, 0xcb, 0x67, 0xd5, 0xc6, 0x8d, 0x2c, 0xbb,
0xa3, 0x6c, 0x98, 0xc1, 0xd1, 0x3f, 0x4a, 0x7b, 0xd2, 0x45, 0x7d, 0x9f, 0x4f, 0x16, 0x66, 0x3d,
0x0d, 0xc3, 0xfb, 0xe6, 0xc5, 0x5e, 0x01, 0x6f, 0xc7, 0x36, 0xee, 0xcc, 0xd3, 0x34, 0x0f, 0x7f,
0x93, 0xd5, 0x43, 0x15, 0xe2, 0xce, 0xd5, 0x7b, 0x76, 0x9f, 0xd9, 0x6d, 0x62, 0x8f, 0x69, 0xb3,
0x29, 0x86, 0x23, 0x5e, 0xff, 0x07, 0x00, 0x00, 0xff, 0xff, 0x22, 0x65, 0x99, 0x41, 0x80, 0x01,
0x00, 0x00,
// 204 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0xcf, 0x31, 0x4b, 0xc7, 0x30,
0x10, 0x05, 0x70, 0x52, 0xfe, 0x94, 0x36, 0x8b, 0x92, 0x41, 0x3a, 0x96, 0x2e, 0xad, 0x08, 0x09,
0xd4, 0xc5, 0xd9, 0x4e, 0x3a, 0x49, 0x10, 0x05, 0xb7, 0x34, 0x9e, 0x5a, 0xb4, 0x77, 0x25, 0x3d,
0x95, 0x7e, 0x25, 0x3f, 0xa5, 0x58, 0x4c, 0xe6, 0xff, 0x76, 0x0f, 0xee, 0xc7, 0xe3, 0xc9, 0xab,
0xaf, 0x3e, 0xb8, 0x4d, 0x7b, 0x9a, 0x8d, 0xa7, 0x00, 0x86, 0x83, 0xc3, 0x75, 0xa1, 0xc0, 0x66,
0x42, 0x86, 0x80, 0xc0, 0xe6, 0x1b, 0xc6, 0x95, 0xfc, 0x3b, 0xb0, 0xf1, 0x84, 0x2f, 0xd3, 0xab,
0x5e, 0x02, 0x31, 0xa9, 0x36, 0xca, 0x00, 0x3a, 0x29, 0x1d, 0x95, 0x4e, 0xaa, 0x39, 0x97, 0x27,
0x03, 0x21, 0x82, 0xe7, 0x89, 0xd0, 0xc2, 0xe7, 0x0a, 0xea, 0x4c, 0xe6, 0x80, 0x6e, 0xfc, 0x80,
0x4a, 0xd4, 0xa2, 0x2b, 0xec, 0x7f, 0x6a, 0x1a, 0x99, 0x0f, 0x7b, 0x87, 0x52, 0xf2, 0xb0, 0x38,
0x7e, 0xab, 0xb2, 0x5a, 0x74, 0xa5, 0xdd, 0xef, 0xdb, 0x43, 0x21, 0x4e, 0xb3, 0xeb, 0x67, 0x79,
0xe1, 0x69, 0xd6, 0x47, 0xb6, 0xdf, 0x89, 0xa7, 0x32, 0x85, 0x9f, 0xac, 0x7d, 0xe8, 0xad, 0xdb,
0xf4, 0xf0, 0xc7, 0xee, 0x13, 0xbb, 0x89, 0xec, 0x31, 0x7e, 0x8e, 0xf9, 0x3e, 0xf2, 0xf2, 0x37,
0x00, 0x00, 0xff, 0xff, 0x7a, 0xf3, 0x2b, 0x77, 0x20, 0x01, 0x00, 0x00,
}

View File

@ -11,8 +11,7 @@ message ConnectionReuse {
}
message Config {
// Whether or not to reuse WebSocket connections.
ConnectionReuse connection_reuse = 1;
reserved 1;
// URL path to the WebSocket service. Empty value means root(/).
string path = 2;

View File

@ -10,32 +10,17 @@ import (
"v2ray.com/core/common/errors"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
)
var (
globalCache = internal.NewConnectionPool()
)
func Dial(ctx context.Context, dest v2net.Destination) (internet.Connection, error) {
log.Trace(errors.New("WebSocket|Dialer: Creating connection to ", dest))
src := internet.DialerSourceFromContext(ctx)
wsSettings := internet.TransportSettingsFromContext(ctx).(*Config)
log.Trace(errors.New("creating connection to ", dest).Path("Transport", "Internet", "WebSocket"))
id := internal.NewConnectionID(src, dest)
var conn net.Conn
if dest.Network == v2net.Network_TCP && wsSettings.IsConnectionReuse() {
conn = globalCache.Get(id)
}
if conn == nil {
var err error
conn, err = dialWebsocket(ctx, dest)
conn, err := dialWebsocket(ctx, dest)
if err != nil {
return nil, errors.New("dial failed").Path("WebSocket", "Dialer")
}
}
return internal.NewConnection(id, conn, globalCache, internal.ReuseConnection(wsSettings.IsConnectionReuse())), nil
return internet.Connection(conn), nil
}
func init() {

View File

@ -15,7 +15,6 @@ import (
"v2ray.com/core/common/errors"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
)
@ -42,7 +41,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
select {
case <-h.ln.ctx.Done():
conn.Close()
case h.ln.conns <- internal.NewConnection(internal.ConnectionID{}, conn, h.ln, internal.ReuseConnection(h.ln.config.IsConnectionReuse())):
case h.ln.conns <- internet.Connection(conn):
case <-time.After(time.Second * 5):
conn.Close()
}
@ -120,16 +119,6 @@ func converttovws(w http.ResponseWriter, r *http.Request) (*connection, error) {
return &connection{wsc: conn}, nil
}
func (ln *Listener) Put(id internal.ConnectionID, conn net.Conn) {
select {
case <-ln.ctx.Done():
conn.Close()
case ln.conns <- internal.NewConnection(internal.ConnectionID{}, conn, ln, internal.ReuseConnection(ln.config.IsConnectionReuse())):
case <-time.After(time.Second * 5):
conn.Close()
}
}
func (ln *Listener) Addr() net.Addr {
return ln.listener.Addr()
}

View File

@ -30,7 +30,6 @@ func Test_listenWSAndDial(t *testing.T) {
n, err := c.Read(b[:])
//assert.Error(err).IsNil()
if err != nil {
c.SetReusable(false)
return
}
assert.Bool(bytes.HasPrefix(b[:n], []byte("Test connection"))).IsTrue()
@ -87,9 +86,6 @@ func Test_listenWSAndDial_TLS(t *testing.T) {
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{
Path: "wss",
ConnectionReuse: &ConnectionReuse{
Enable: true,
},
})
ctx = internet.ContextWithSecuritySettings(ctx, &v2tls.Config{
AllowInsecure: true,