1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-17 14:57:44 -05:00

Merge pull request #170 from xiaokangwang/kcptransport

Add KCP(UDPSession) as a transport for v2ray, reslove #162
This commit is contained in:
Darien Raymond 2016-06-13 16:35:57 +02:00 committed by GitHub
commit 22ce652a25
13 changed files with 450 additions and 8 deletions

View File

@ -18,11 +18,15 @@ type InboundHandlerMeta struct {
Tag string
Address v2net.Address
Port v2net.Port
//Whether this proxy support KCP connections
KcpSupported bool
}
type OutboundHandlerMeta struct {
Tag string
Address v2net.Address
//Whether this proxy support KCP connections
KcpSupported bool
}
// An InboundHandler handles inbound network connections to V2Ray.

View File

@ -106,7 +106,7 @@ func (this *VMessInboundHandler) Start() error {
return nil
}
tcpListener, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.HandleConnection, nil)
tcpListener, err := hub.ListenTCP6(this.meta.Address, this.meta.Port, this.HandleConnection, this.meta, nil)
if err != nil {
log.Error("Unable to listen tcp ", this.meta.Address, ":", this.meta.Port, ": ", err)
return err
@ -220,7 +220,9 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
readFinish.Lock()
}
func (this *VMessInboundHandler) setProxyCap() {
this.meta.KcpSupported = true
}
func init() {
internal.MustRegisterInboundHandlerCreator("vmess",
func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
@ -246,6 +248,8 @@ func init() {
handler.inboundHandlerManager = space.GetApp(proxyman.APP_ID_INBOUND_MANAGER).(proxyman.InboundHandlerManager)
}
handler.setProxyCap()
return handler, nil
})
}

View File

@ -34,7 +34,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
err := retry.Timed(5, 100).On(func() error {
rec = this.receiverManager.PickReceiver()
rawConn, err := hub.Dial(this.meta.Address, rec.Destination)
rawConn, err := hub.Dial3(this.meta.Address, rec.Destination, this.meta)
if err != nil {
return err
}
@ -154,14 +154,21 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
return
}
func (this *VMessOutboundHandler) setProxyCap() {
this.meta.KcpSupported = true
}
func init() {
internal.MustRegisterOutboundHandlerCreator("vmess",
func(space app.Space, rawConfig interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
vOutConfig := rawConfig.(*Config)
return &VMessOutboundHandler{
handler := &VMessOutboundHandler{
receiverManager: NewReceiverManager(vOutConfig.Receivers),
meta: meta,
}, nil
}
handler.setProxyCap()
return handler, nil
})
}

View File

@ -1,8 +1,12 @@
package transport
import "github.com/v2ray/v2ray-core/transport/hub/kcpv"
// Config for V2Ray transport layer.
type Config struct {
ConnectionReuse bool
enableKcp bool
kcpConfig *kcpv.Config
}
// Apply applies this Config.
@ -10,5 +14,17 @@ func (this *Config) Apply() error {
if this.ConnectionReuse {
connectionReuse = true
}
enableKcp = this.enableKcp
if enableKcp {
KcpConfig = this.kcpConfig
/*
KCP do not support connectionReuse,
it is mandatory to set connectionReuse to false
Since KCP have no handshake and
does not SlowStart, there isn't benefit to
use that anyway.
*/
connectionReuse = false
}
return nil
}

View File

@ -2,18 +2,38 @@
package transport
import "encoding/json"
import (
"encoding/json"
"github.com/v2ray/v2ray-core/common/log"
"github.com/v2ray/v2ray-core/transport/hub/kcpv"
)
func (this *Config) UnmarshalJSON(data []byte) error {
type JsonConfig struct {
ConnectionReuse bool `json:"connectionReuse"`
ConnectionReuse bool `json:"connectionReuse"`
EnableKcp bool `json:"EnableKCP,omitempty"`
KcpConfig *kcpv.Config `json:"KcpConfig,omitempty"`
}
jsonConfig := &JsonConfig{
ConnectionReuse: true,
EnableKcp: false,
}
if err := json.Unmarshal(data, jsonConfig); err != nil {
return err
}
this.ConnectionReuse = jsonConfig.ConnectionReuse
this.enableKcp = jsonConfig.EnableKcp
if jsonConfig.KcpConfig != nil {
this.kcpConfig = jsonConfig.KcpConfig
if jsonConfig.KcpConfig.AdvancedConfigs == nil {
jsonConfig.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs
}
} else {
if jsonConfig.EnableKcp {
log.Error("transport: You have enabled KCP but no configure is given")
}
}
return nil
}

View File

@ -6,6 +6,7 @@ import (
"time"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/transport"
)
@ -62,3 +63,40 @@ func DialWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, erro
return dialer.Dial(dest.Network().String(), dest.NetAddr())
}
func Dial3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) {
if proxyMeta.KcpSupported && transport.IsKcpEnabled() {
return DialKCP3(src, dest, proxyMeta)
}
return Dial(src, dest)
}
func DialWithoutCache3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (net.Conn, error) {
if proxyMeta.KcpSupported && transport.IsKcpEnabled() {
return DialKCPWithoutCache(src, dest)
}
return DialWithoutCache(src, dest)
}
func DialKCP3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) {
if src == nil {
src = v2net.AnyIP
}
id := src.String() + "-" + dest.NetAddr()
conn, err := DialWithoutCache3(src, dest, proxyMeta)
if err != nil {
return nil, err
}
return &Connection{
dest: id,
conn: conn,
listener: globalCache,
}, nil
}
/*DialKCPWithoutCache Dial KCP connection
This Dialer will ignore src this is a restriction
due to github.com/xtaci/kcp-go.DialWithOptions
*/
func DialKCPWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, error) {
return DialKCP(dest)
}

193
transport/hub/kcp.go Normal file
View File

@ -0,0 +1,193 @@
package hub
import (
"errors"
"net"
"time"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport"
"github.com/v2ray/v2ray-core/transport/hub/kcpv"
"github.com/xtaci/kcp-go"
)
type KCPVlistener struct {
lst *kcp.Listener
conf *kcpv.Config
previousSocketid map[int]uint32
previousSocketid_mapid int
}
/*Accept Accept a KCP connection
Since KCP is stateless, if package deliver after it was closed,
It could be reconized as a new connection and call accept.
If we can detect that the connection is of such a kind,
we will discard that conn.
*/
func (kvl *KCPVlistener) Accept() (net.Conn, error) {
conn, err := kvl.lst.Accept()
if err != nil {
return nil, err
}
if kvl.previousSocketid == nil {
kvl.previousSocketid = make(map[int]uint32)
}
var badbit bool = false
for _, key := range kvl.previousSocketid {
if key == conn.GetConv() {
badbit = true
}
}
if badbit {
conn.Close()
return nil, errors.New("KCP:ConnDup, Don't worry~")
} else {
kvl.previousSocketid_mapid++
kvl.previousSocketid[kvl.previousSocketid_mapid] = conn.GetConv()
/*
Here we assume that count(connection) < 512
This won't always true.
More work might be necessary to deal with this in a better way.
*/
if kvl.previousSocketid_mapid >= 512 {
delete(kvl.previousSocketid, kvl.previousSocketid_mapid-512)
}
}
kcv := &KCPVconn{hc: conn}
kcv.conf = kvl.conf
err = kcv.ApplyConf()
if err != nil {
return nil, err
}
return kcv, nil
}
func (kvl *KCPVlistener) Close() error {
return kvl.lst.Close()
}
func (kvl *KCPVlistener) Addr() net.Addr {
return kvl.lst.Addr()
}
type KCPVconn struct {
hc *kcp.UDPSession
conf *kcpv.Config
conntokeep time.Time
}
//var counter int
func (kcpvc *KCPVconn) Read(b []byte) (int, error) {
ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.ReadTimeout) * time.Second)
if ifb.After(kcpvc.conntokeep) {
kcpvc.conntokeep = ifb
}
kcpvc.hc.SetDeadline(kcpvc.conntokeep)
return kcpvc.hc.Read(b)
}
func (kcpvc *KCPVconn) Write(b []byte) (int, error) {
ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.WriteTimeout) * time.Second)
if ifb.After(kcpvc.conntokeep) {
kcpvc.conntokeep = ifb
}
kcpvc.hc.SetDeadline(kcpvc.conntokeep)
return kcpvc.hc.Write(b)
}
/*ApplyConf will apply kcpvc.conf to current Socket
It is recommmanded to call this func once and only once
*/
func (kcpvc *KCPVconn) ApplyConf() error {
nodelay, interval, resend, nc := 0, 40, 0, 0
if kcpvc.conf.Mode != "manual" {
switch kcpvc.conf.Mode {
case "normal":
nodelay, interval, resend, nc = 0, 30, 2, 1
case "fast":
nodelay, interval, resend, nc = 0, 20, 2, 1
case "fast2":
nodelay, interval, resend, nc = 1, 20, 2, 1
case "fast3":
nodelay, interval, resend, nc = 1, 10, 2, 1
}
} else {
log.Error("kcp: Failed to Apply configure: Manual mode is not supported.(yet!)")
return errors.New("kcp: Manual Not Implemented")
}
kcpvc.hc.SetNoDelay(nodelay, interval, resend, nc)
kcpvc.hc.SetWindowSize(kcpvc.conf.AdvancedConfigs.Sndwnd, kcpvc.conf.AdvancedConfigs.Rcvwnd)
kcpvc.hc.SetMtu(kcpvc.conf.AdvancedConfigs.Mtu)
kcpvc.hc.SetACKNoDelay(kcpvc.conf.AdvancedConfigs.Acknodelay)
kcpvc.hc.SetDSCP(kcpvc.conf.AdvancedConfigs.Dscp)
//counter++
//log.Info(counter)
return nil
}
/*Close Close the current conn
We have to delay the close of Socket for a few second
or the VMess EOF can be too late to send.
*/
func (kcpvc *KCPVconn) Close() error {
go func() {
time.Sleep(2000 * time.Millisecond)
//counter--
//log.Info(counter)
kcpvc.hc.Close()
}()
return nil
}
func (kcpvc *KCPVconn) LocalAddr() net.Addr {
return kcpvc.hc.LocalAddr()
}
func (kcpvc *KCPVconn) RemoteAddr() net.Addr {
return kcpvc.hc.RemoteAddr()
}
func (kcpvc *KCPVconn) SetDeadline(t time.Time) error {
return kcpvc.hc.SetDeadline(t)
}
func (kcpvc *KCPVconn) SetReadDeadline(t time.Time) error {
return kcpvc.hc.SetReadDeadline(t)
}
func (kcpvc *KCPVconn) SetWriteDeadline(t time.Time) error {
return kcpvc.hc.SetWriteDeadline(t)
}
func DialKCP(dest v2net.Destination) (*KCPVconn, error) {
kcpconf := transport.KcpConfig
cpip, _ := kcpv.GetChipher(kcpconf.Key)
kcv, err := kcp.DialWithOptions(kcpconf.AdvancedConfigs.Fec, dest.NetAddr(), cpip)
if err != nil {
return nil, err
}
kcvn := &KCPVconn{hc: kcv}
kcvn.conf = kcpconf
err = kcvn.ApplyConf()
if err != nil {
return nil, err
}
return kcvn, nil
}
func ListenKCP(address v2net.Address, port v2net.Port) (*KCPVlistener, error) {
kcpconf := transport.KcpConfig
cpip, _ := kcpv.GetChipher(kcpconf.Key)
laddr := address.String() + ":" + port.String()
kcl, err := kcp.ListenWithOptions(kcpconf.AdvancedConfigs.Fec, laddr, cpip)
kcvl := &KCPVlistener{lst: kcl, conf: kcpconf}
return kcvl, err
}

31
transport/hub/kcp_test.go Normal file
View File

@ -0,0 +1,31 @@
package hub_test
import "testing"
import (
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/testing/assert"
"github.com/v2ray/v2ray-core/transport"
"github.com/v2ray/v2ray-core/transport/hub"
"github.com/v2ray/v2ray-core/transport/hub/kcpv"
)
func Test_Pair(t *testing.T) {
assert := assert.On(t)
transport.KcpConfig = &kcpv.Config{}
transport.KcpConfig.Mode = "fast2"
transport.KcpConfig.Key = "key"
transport.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs
lst, _ := hub.ListenKCP(v2net.ParseAddress("127.0.0.1"), 17777)
go func() {
connx, err2 := lst.Accept()
assert.Error(err2).IsNil()
connx.Close()
}()
conn, _ := hub.DialKCP(v2net.TCPDestination(v2net.ParseAddress("127.0.0.1"), 17777))
conn.LocalAddr()
conn.RemoteAddr()
conn.ApplyConf()
conn.Write([]byte("x"))
conn.Close()
}

View File

@ -0,0 +1,65 @@
package kcpv
/*AdvancedConfig define behavior of KCP in detail
MaximumTransmissionUnit:
Largest protocol data unit that the layer can pass onwards
can be discovered by running tracepath
SendingWindowSize , ReceivingWindowSize:
the size of Tx/Rx window, by packet
ForwardErrorCorrectionGroupSize:
The the size of packet to generate a Forward Error Correction
packet, this is used to counteract packet loss.
AcknowledgeNoDelay:
Do not wait a certain of time before sending the ACK packet,
increase bandwich cost and might increase performance
Dscp:
Differentiated services code point,
be used to reconized to discriminate packet.
It is recommanded to keep it 0 to avoid being detected.
ReadTimeout,WriteTimeout:
Close the Socket if it have been silent for too long,
Small value can recycle zombie socket faster but
can cause v2ray to kill the proxy connection it is relaying,
Higher value can prevent server from closing zombie socket and
waste resources.
*/
type AdvancedConfig struct {
Mtu int `json:"MaximumTransmissionUnit"`
Sndwnd int `json:"SendingWindowSize"`
Rcvwnd int `json:"ReceivingWindowSize"`
Fec int `json:"ForwardErrorCorrectionGroupSize"`
Acknodelay bool `json:"AcknowledgeNoDelay"`
Dscp int `json:"Dscp"`
ReadTimeout int `json:"ReadTimeout"`
WriteTimeout int `json:"WriteTimeout"`
}
/*Config define basic behavior of KCP
Mode:
can be one of these values:
fast3,fast2,fast,normal
<<<<<<- less delay
->>>>>> less bandwich wasted
EncryptionKey:
a string that will be the EncryptionKey of
All KCP connection we Listen-Accpet or
Dial, We are not very sure about how this
encryption hehave and DO use a unique randomly
generated key.
*/
type Config struct {
Mode string `json:"Mode"`
Key string `json:"EncryptionKey"`
AdvancedConfigs *AdvancedConfig `json:"AdvancedConfig,omitempty"`
}
var DefaultAdvancedConfigs = &AdvancedConfig{
Mtu: 1350, Sndwnd: 1024, Rcvwnd: 1024, Fec: 4, Dscp: 0, ReadTimeout: 600, WriteTimeout: 500, Acknodelay: false,
}

View File

@ -0,0 +1,3 @@
package kcpv
//We can use the default version of json parser

View File

@ -0,0 +1,21 @@
package kcpv
import (
"crypto/aes"
"crypto/cipher"
"crypto/sha256"
)
func generateKeyFromConfigString(key string) []byte {
key += "consensus salt: Let's fight arcifical deceleration with our code. We shall prove our believes with action."
keyw := sha256.Sum256([]byte(key))
return keyw[:]
}
func generateBlockWithKey(key []byte) (cipher.Block, error) {
return aes.NewCipher(key)
}
func GetChipher(key string) (cipher.Block, error) {
return generateBlockWithKey(generateKeyFromConfigString(key))
}

View File

@ -8,6 +8,8 @@ import (
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/transport"
)
var (
@ -47,6 +49,36 @@ func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandle
go hub.start()
return hub, nil
}
func ListenKCPhub(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) {
listener, err := ListenKCP(address, port)
if err != nil {
return nil, err
}
var hub *TCPHub
if tlsConfig != nil {
tlsListener := tls.NewListener(listener, tlsConfig)
hub = &TCPHub{
listener: tlsListener,
connCallback: callback,
}
} else {
hub = &TCPHub{
listener: listener,
connCallback: callback,
}
}
go hub.start()
return hub, nil
}
func ListenTCP6(address v2net.Address, port v2net.Port, callback ConnectionHandler, proxyMeta *proxy.InboundHandlerMeta, tlsConfig *tls.Config) (*TCPHub, error) {
if proxyMeta.KcpSupported && transport.IsKcpEnabled() {
return ListenKCPhub(address, port, callback, tlsConfig)
} else {
return ListenTCP(address, port, callback, tlsConfig)
}
return nil, errors.New("ListenTCP6: Not Implemented")
}
func (this *TCPHub) Close() {
this.accepting = false

View File

@ -1,10 +1,18 @@
package transport
import "github.com/v2ray/v2ray-core/transport/hub/kcpv"
var (
connectionReuse = true
enableKcp = false
KcpConfig *kcpv.Config
)
// IsConnectionReusable returns true if V2Ray is trying to reuse TCP connections.
func IsConnectionReusable() bool {
return connectionReuse
}
func IsKcpEnabled() bool {
return enableKcp
}