From a86cd36ad2923d287e98b3666352bee090d2bca2 Mon Sep 17 00:00:00 2001 From: v2ray Date: Thu, 2 Jun 2016 01:49:25 +0200 Subject: [PATCH] configuration for connection reuse --- shell/point/config.go | 2 ++ shell/point/config_json.go | 3 +++ shell/point/point.go | 5 +++++ transport/config.go | 16 +++++++++++++ transport/config_json.go | 36 ++++++++++++++++++++++++++++++ transport/hub/connection.go | 4 +++- transport/hub/connection_cache.go | 37 ++++++++++++++++++++++++++----- transport/hub/dialer.go | 6 ++++- transport/transport.go | 22 ++++++++++++++++++ 9 files changed, 124 insertions(+), 7 deletions(-) create mode 100644 transport/config.go create mode 100644 transport/config_json.go create mode 100644 transport/transport.go diff --git a/shell/point/config.go b/shell/point/config.go index d8a96e51e..e17658c66 100644 --- a/shell/point/config.go +++ b/shell/point/config.go @@ -5,6 +5,7 @@ import ( "github.com/v2ray/v2ray-core/app/router" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport" ) type ConnectionConfig struct { @@ -55,6 +56,7 @@ type Config struct { OutboundConfig *ConnectionConfig InboundDetours []*InboundDetourConfig OutboundDetours []*OutboundDetourConfig + TransportConfig *transport.Config } type ConfigLoader func(init string) (*Config, error) diff --git a/shell/point/config_json.go b/shell/point/config_json.go index 82a3038a8..c1c40d32d 100644 --- a/shell/point/config_json.go +++ b/shell/point/config_json.go @@ -13,6 +13,7 @@ import ( "github.com/v2ray/v2ray-core/app/router" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport" ) const ( @@ -30,6 +31,7 @@ func (this *Config) UnmarshalJSON(data []byte) error { OutboundConfig *ConnectionConfig `json:"outbound"` InboundDetours []*InboundDetourConfig `json:"inboundDetour"` OutboundDetours []*OutboundDetourConfig `json:"outboundDetour"` + Transport *transport.Config `json:"transport"` } jsonConfig := new(JsonConfig) if err := json.Unmarshal(data, jsonConfig); err != nil { @@ -57,6 +59,7 @@ func (this *Config) UnmarshalJSON(data []byte) error { } } this.DNSConfig = jsonConfig.DNSConfig + this.TransportConfig = jsonConfig.Transport return nil } diff --git a/shell/point/point.go b/shell/point/point.go index 2131d86fb..eb1b4e79b 100644 --- a/shell/point/point.go +++ b/shell/point/point.go @@ -16,6 +16,7 @@ import ( "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/proxy" proxyrepo "github.com/v2ray/v2ray-core/proxy/repo" + "github.com/v2ray/v2ray-core/transport" ) // Point shell of V2Ray. @@ -38,6 +39,10 @@ func NewPoint(pConfig *Config) (*Point, error) { vpoint.port = pConfig.Port vpoint.listen = pConfig.ListenOn + if pConfig.TransportConfig != nil { + transport.ApplyConfig(pConfig.TransportConfig) + } + if pConfig.LogConfig != nil { logConfig := pConfig.LogConfig if len(logConfig.AccessLog) > 0 { diff --git a/transport/config.go b/transport/config.go new file mode 100644 index 000000000..4dd04a87f --- /dev/null +++ b/transport/config.go @@ -0,0 +1,16 @@ +package transport + +type StreamType int + +const ( + StreamTypeTCP = StreamType(0) +) + +type TCPConfig struct { + ConnectionReuse bool +} + +type Config struct { + StreamType StreamType + TCPConfig *TCPConfig +} diff --git a/transport/config_json.go b/transport/config_json.go new file mode 100644 index 000000000..84354d1f3 --- /dev/null +++ b/transport/config_json.go @@ -0,0 +1,36 @@ +// +build json + +package transport + +import ( + "encoding/json" + "strings" +) + +func (this *Config) UnmarshalJSON(data []byte) error { + type TypeConfig struct { + StreamType string `json:"streamType"` + Settings json.RawMessage `json:"settings"` + } + type JsonTCPConfig struct { + ConnectionReuse bool `json:"connectionReuse"` + } + + typeConfig := new(TypeConfig) + if err := json.Unmarshal(data, typeConfig); err != nil { + return err + } + + streamType := strings.ToLower(typeConfig.StreamType) + if streamType == "tcp" { + jsonTCPConfig := new(JsonTCPConfig) + if err := json.Unmarshal(data, jsonTCPConfig); err != nil { + return err + } + this.TCPConfig = &TCPConfig{ + ConnectionReuse: jsonTCPConfig.ConnectionReuse, + } + } + + return nil +} diff --git a/transport/hub/connection.go b/transport/hub/connection.go index 47283cf42..f00c8c987 100644 --- a/transport/hub/connection.go +++ b/transport/hub/connection.go @@ -3,6 +3,8 @@ package hub import ( "net" "time" + + "github.com/v2ray/v2ray-core/transport" ) type ConnectionHandler func(*Connection) @@ -37,7 +39,7 @@ func (this *Connection) Close() error { if this == nil || this.conn == nil { return ErrorClosedConnection } - if this.Reusable() { + if transport.TCPStreamConfig.ConnectionReuse && this.Reusable() { this.listener.Recycle(this.dest, this.conn) return nil } diff --git a/transport/hub/connection_cache.go b/transport/hub/connection_cache.go index 76524a20b..51fbf93fd 100644 --- a/transport/hub/connection_cache.go +++ b/transport/hub/connection_cache.go @@ -3,9 +3,33 @@ package hub import ( "net" "sync" + "sync/atomic" "time" ) +type Once struct { + m sync.Mutex + done uint32 +} + +func (o *Once) Do(f func()) { + if atomic.LoadUint32(&o.done) == 1 { + return + } + o.m.Lock() + defer o.m.Unlock() + if o.done == 0 { + defer atomic.StoreUint32(&o.done, 1) + f() + } +} + +func (o *Once) Reset() { + o.m.Lock() + defer o.m.Unlock() + atomic.StoreUint32(&o.done, 0) +} + type AwaitingConnection struct { conn net.Conn expire time.Time @@ -17,19 +41,20 @@ func (this *AwaitingConnection) Expired() bool { type ConnectionCache struct { sync.Mutex - cache map[string][]*AwaitingConnection + cache map[string][]*AwaitingConnection + cleanupOnce Once } func NewConnectionCache() *ConnectionCache { - c := &ConnectionCache{ + return &ConnectionCache{ cache: make(map[string][]*AwaitingConnection), } - go c.Cleanup() - return c } func (this *ConnectionCache) Cleanup() { - for { + defer this.cleanupOnce.Reset() + + for len(this.cache) > 0 { time.Sleep(time.Second * 4) this.Lock() for key, value := range this.cache { @@ -74,6 +99,8 @@ func (this *ConnectionCache) Recycle(dest string, conn net.Conn) { list = []*AwaitingConnection{aconn} } this.cache[dest] = list + + go this.cleanupOnce.Do(this.Cleanup) } func FindFirstValid(list []*AwaitingConnection) int { diff --git a/transport/hub/dialer.go b/transport/hub/dialer.go index c841029d2..10c4278a7 100644 --- a/transport/hub/dialer.go +++ b/transport/hub/dialer.go @@ -7,6 +7,7 @@ import ( "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport" ) var ( @@ -17,7 +18,10 @@ var ( func Dial(dest v2net.Destination) (*Connection, error) { destStr := dest.String() - conn := globalCache.Get(destStr) + var conn net.Conn + if transport.TCPStreamConfig.ConnectionReuse { + conn = globalCache.Get(destStr) + } if conn == nil { var err error log.Debug("Hub: Dialling new connection to ", dest) diff --git a/transport/transport.go b/transport/transport.go new file mode 100644 index 000000000..e5bd23658 --- /dev/null +++ b/transport/transport.go @@ -0,0 +1,22 @@ +package transport + +import "github.com/v2ray/v2ray-core/common/log" + +var ( + TCPStreamConfig = &TCPConfig{ + ConnectionReuse: false, + } +) + +func ApplyConfig(config *Config) error { + if config.StreamType == StreamTypeTCP { + if config.TCPConfig != nil { + TCPStreamConfig = config.TCPConfig + if config.TCPConfig.ConnectionReuse { + log.Info("Transport: TCP connection reuse enabled.") + } + } + } + + return nil +}