From 4afad8d31ca5d0225af9bc368e060404ab049e62 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Wed, 4 Jan 2017 15:34:21 +0100 Subject: [PATCH] simpify websocket connection --- transport/internet/websocket/connection.go | 99 --------------- .../internet/websocket/connection_cache.go | 114 ------------------ transport/internet/websocket/dialer.go | 7 +- transport/internet/websocket/hub.go | 5 +- transport/internet/websocket/wsconn.go | 2 +- 5 files changed, 8 insertions(+), 219 deletions(-) delete mode 100644 transport/internet/websocket/connection.go delete mode 100644 transport/internet/websocket/connection_cache.go diff --git a/transport/internet/websocket/connection.go b/transport/internet/websocket/connection.go deleted file mode 100644 index 5be16ae48..000000000 --- a/transport/internet/websocket/connection.go +++ /dev/null @@ -1,99 +0,0 @@ -package websocket - -import ( - "io" - "net" - "time" - - "v2ray.com/core/common/errors" -) - -var ( - ErrInvalidConn = errors.New("Invalid Connection.") -) - -type ConnectionManager interface { - Recycle(string, *wsconn) -} - -type Connection struct { - dest string - conn *wsconn - listener ConnectionManager - reusable bool - config *Config -} - -func NewConnection(dest string, conn *wsconn, manager ConnectionManager, config *Config) *Connection { - return &Connection{ - dest: dest, - conn: conn, - listener: manager, - reusable: config.IsConnectionReuse(), - config: config, - } -} - -func (v *Connection) Read(b []byte) (int, error) { - if v == nil || v.conn == nil { - return 0, io.EOF - } - - return v.conn.Read(b) -} - -func (v *Connection) Write(b []byte) (int, error) { - if v == nil || v.conn == nil { - return 0, io.ErrClosedPipe - } - return v.conn.Write(b) -} - -func (v *Connection) Close() error { - if v == nil || v.conn == nil { - return io.ErrClosedPipe - } - if v.Reusable() { - v.listener.Recycle(v.dest, v.conn) - return nil - } - err := v.conn.Close() - v.conn = nil - return err -} - -func (v *Connection) LocalAddr() net.Addr { - return v.conn.LocalAddr() -} - -func (v *Connection) RemoteAddr() net.Addr { - return v.conn.RemoteAddr() -} - -func (v *Connection) SetDeadline(t time.Time) error { - return v.conn.SetDeadline(t) -} - -func (v *Connection) SetReadDeadline(t time.Time) error { - return v.conn.SetReadDeadline(t) -} - -func (v *Connection) SetWriteDeadline(t time.Time) error { - return v.conn.SetWriteDeadline(t) -} - -func (v *Connection) SetReusable(reusable bool) { - v.reusable = reusable -} - -func (v *Connection) Reusable() bool { - return v.config.IsConnectionReuse() && v.reusable -} - -func (v *Connection) SysFd() (int, error) { - return getSysFd(v.conn) -} - -func getSysFd(conn net.Conn) (int, error) { - return 0, ErrInvalidConn -} diff --git a/transport/internet/websocket/connection_cache.go b/transport/internet/websocket/connection_cache.go deleted file mode 100644 index 41afcb3f9..000000000 --- a/transport/internet/websocket/connection_cache.go +++ /dev/null @@ -1,114 +0,0 @@ -package websocket - -import ( - "net" - "sync" - "time" - - "v2ray.com/core/common/log" - "v2ray.com/core/common/signal" -) - -type AwaitingConnection struct { - conn *wsconn - expire time.Time -} - -func (v *AwaitingConnection) Expired() bool { - return v.expire.Before(time.Now()) -} - -type ConnectionCache struct { - sync.Mutex - cache map[string][]*AwaitingConnection - cleanupOnce signal.Once -} - -func NewConnectionCache() *ConnectionCache { - return &ConnectionCache{ - cache: make(map[string][]*AwaitingConnection), - } -} - -func (v *ConnectionCache) Cleanup() { - defer v.cleanupOnce.Reset() - - for len(v.cache) > 0 { - time.Sleep(time.Second * 7) - v.Lock() - for key, value := range v.cache { - size := len(value) - changed := false - for i := 0; i < size; { - if value[i].Expired() { - value[i].conn.Close() - value[i] = value[size-1] - size-- - changed = true - } else { - i++ - } - } - if changed { - for i := size; i < len(value); i++ { - value[i] = nil - } - value = value[:size] - v.cache[key] = value - } - } - v.Unlock() - } -} - -func (v *ConnectionCache) Recycle(dest string, conn *wsconn) { - v.Lock() - defer v.Unlock() - - aconn := &AwaitingConnection{ - conn: conn, - expire: time.Now().Add(time.Second * 7), - } - - var list []*AwaitingConnection - if val, found := v.cache[dest]; found { - val = append(val, aconn) - list = val - } else { - list = []*AwaitingConnection{aconn} - } - v.cache[dest] = list - - go v.cleanupOnce.Do(v.Cleanup) -} - -func FindFirstValid(list []*AwaitingConnection) int { - for idx, conn := range list { - if !conn.Expired() && !conn.conn.connClosing { - return idx - } - go conn.conn.Close() - } - return -1 -} - -func (v *ConnectionCache) Get(dest string) net.Conn { - v.Lock() - defer v.Unlock() - - list, found := v.cache[dest] - if !found { - return nil - } - - firstValid := FindFirstValid(list) - if firstValid == -1 { - delete(v.cache, dest) - return nil - } - res := list[firstValid].conn - list = list[firstValid+1:] - v.cache[dest] = list - log.Debug("WS:Conn Cache used.") - return res -} diff --git a/transport/internet/websocket/dialer.go b/transport/internet/websocket/dialer.go index fe34b3fa7..2caf2069e 100644 --- a/transport/internet/websocket/dialer.go +++ b/transport/internet/websocket/dialer.go @@ -9,11 +9,12 @@ import ( "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" + "v2ray.com/core/transport/internet/internal" v2tls "v2ray.com/core/transport/internet/tls" ) var ( - globalCache = NewConnectionCache() + globalCache = internal.NewConnectionPool() ) func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) { @@ -27,7 +28,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti } wsSettings := networkSettings.(*Config) - id := src.String() + "-" + dest.NetAddr() + id := internal.NewConnectionID(src, dest) var conn *wsconn if dest.Network == v2net.Network_TCP && wsSettings.IsConnectionReuse() { connt := globalCache.Get(id) @@ -43,7 +44,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti return nil, err } } - return NewConnection(id, conn, globalCache, wsSettings), nil + return internal.NewConnection(id, conn, globalCache, internal.ReuseConnection(wsSettings.IsConnectionReuse())), nil } func init() { diff --git a/transport/internet/websocket/hub.go b/transport/internet/websocket/hub.go index ba90a8be1..1261d5ab1 100644 --- a/transport/internet/websocket/hub.go +++ b/transport/internet/websocket/hub.go @@ -16,6 +16,7 @@ import ( v2tls "v2ray.com/core/transport/internet/tls" "github.com/gorilla/websocket" + "v2ray.com/core/transport/internet/internal" ) var ( @@ -157,14 +158,14 @@ func (v *WSListener) Accept() (internet.Connection, error) { if connErr.err != nil { return nil, connErr.err } - return NewConnection("", connErr.conn.(*wsconn), v, v.config), nil + return internal.NewConnection(internal.ConnectionID{}, connErr.conn.(*wsconn), v, internal.ReuseConnection(v.config.IsConnectionReuse())), nil case <-time.After(time.Second * 2): } } return nil, ErrClosedListener } -func (v *WSListener) Recycle(dest string, conn *wsconn) { +func (v *WSListener) Put(id internal.ConnectionID, conn net.Conn) { v.Lock() defer v.Unlock() if !v.acccepting { diff --git a/transport/internet/websocket/wsconn.go b/transport/internet/websocket/wsconn.go index 4cd1a15a0..c6b1ca2d2 100644 --- a/transport/internet/websocket/wsconn.go +++ b/transport/internet/websocket/wsconn.go @@ -159,7 +159,7 @@ func (ws *wsconn) SetReusable(reusable bool) { } func (ws *wsconn) pingPong() { - pongRcv := make(chan int, 0) + pongRcv := make(chan int, 1) ws.wsc.SetPongHandler(func(data string) error { pongRcv <- 0 return nil