From 99516bc51160042a682ee6d9f94d0a6f5e5345b4 Mon Sep 17 00:00:00 2001 From: v2ray <admin@v2ray.com> Date: Tue, 14 Jun 2016 23:25:06 +0200 Subject: [PATCH] customizable kcp code --- transport/internet/kcp/config.go | 18 +- transport/internet/kcp/config_json.go | 47 +- transport/internet/kcp/crypt.go | 23 + transport/internet/kcp/dialer.go | 6 +- transport/internet/kcp/kcp.go | 901 ++++++++++++++++++++++++++ transport/internet/kcp/sess.go | 617 ++++++++++++++++++ transport/internet/kcp/session.go | 10 +- 7 files changed, 1591 insertions(+), 31 deletions(-) create mode 100644 transport/internet/kcp/crypt.go create mode 100644 transport/internet/kcp/kcp.go create mode 100644 transport/internet/kcp/sess.go diff --git a/transport/internet/kcp/config.go b/transport/internet/kcp/config.go index 149e060a1..bdde10d10 100644 --- a/transport/internet/kcp/config.go +++ b/transport/internet/kcp/config.go @@ -38,15 +38,14 @@ fast3,fast2,fast,normal ->>>>>> less bandwich wasted */ type Config struct { - Mode string `json:"Mode"` - Mtu int `json:"MaximumTransmissionUnit"` - Sndwnd int `json:"SendingWindowSize"` - Rcvwnd int `json:"ReceivingWindowSize"` - Fec int `json:"ForwardErrorCorrectionGroupSize"` - Acknodelay bool `json:"AcknowledgeNoDelay"` - Dscp int `json:"Dscp"` - ReadTimeout int `json:"ReadTimeout"` - WriteTimeout int `json:"WriteTimeout"` + Mode string + Mtu int + Sndwnd int + Rcvwnd int + Acknodelay bool + Dscp int + ReadTimeout int + WriteTimeout int } func (this *Config) Apply() { @@ -59,7 +58,6 @@ var ( Mtu: 1350, Sndwnd: 1024, Rcvwnd: 1024, - Fec: 4, Dscp: 0, ReadTimeout: 600, WriteTimeout: 500, diff --git a/transport/internet/kcp/config_json.go b/transport/internet/kcp/config_json.go index c38b2a042..27571d43c 100644 --- a/transport/internet/kcp/config_json.go +++ b/transport/internet/kcp/config_json.go @@ -8,20 +8,45 @@ import ( func (this *Config) UnmarshalJSON(data []byte) error { type JSONConfig struct { - Mode string `json:"Mode"` - Mtu int `json:"MaximumTransmissionUnit"` - Sndwnd int `json:"SendingWindowSize"` - Rcvwnd int `json:"ReceivingWindowSize"` - Fec int `json:"ForwardErrorCorrectionGroupSize"` - Acknodelay bool `json:"AcknowledgeNoDelay"` - Dscp int `json:"Dscp"` - ReadTimeout int `json:"ReadTimeout"` - WriteTimeout int `json:"WriteTimeout"` + Mode *string `json:"Mode"` + Mtu *int `json:"MaximumTransmissionUnit"` + Sndwnd *int `json:"SendingWindowSize"` + Rcvwnd *int `json:"ReceivingWindowSize"` + Acknodelay *bool `json:"AcknowledgeNoDelay"` + Dscp *int `json:"Dscp"` + ReadTimeout *int `json:"ReadTimeout"` + WriteTimeout *int `json:"WriteTimeout"` } - jsonConfig := effectiveConfig + jsonConfig := new(JSONConfig) if err := json.Unmarshal(data, &jsonConfig); err != nil { return err } - *this = jsonConfig + if jsonConfig.Mode != nil { + this.Mode = *jsonConfig.Mode + } + + if jsonConfig.Mtu != nil { + this.Mtu = *jsonConfig.Mtu + } + + if jsonConfig.Sndwnd != nil { + this.Sndwnd = *jsonConfig.Sndwnd + } + if jsonConfig.Rcvwnd != nil { + this.Rcvwnd = *jsonConfig.Rcvwnd + } + if jsonConfig.Acknodelay != nil { + this.Acknodelay = *jsonConfig.Acknodelay + } + if jsonConfig.Dscp != nil { + this.Dscp = *jsonConfig.Dscp + } + if jsonConfig.ReadTimeout != nil { + this.ReadTimeout = *jsonConfig.ReadTimeout + } + if jsonConfig.WriteTimeout != nil { + this.WriteTimeout = *jsonConfig.WriteTimeout + } + return nil } diff --git a/transport/internet/kcp/crypt.go b/transport/internet/kcp/crypt.go new file mode 100644 index 000000000..02e963abe --- /dev/null +++ b/transport/internet/kcp/crypt.go @@ -0,0 +1,23 @@ +package kcp + +type BlockCrypt interface { + // Encrypt encrypts the whole block in src into dst. + // Dst and src may point at the same memory. + Encrypt(dst, src []byte) + + // Decrypt decrypts the whole block in src into dst. + // Dst and src may point at the same memory. + Decrypt(dst, src []byte) +} + +// None Encryption +type NoneBlockCrypt struct { + xortbl []byte +} + +func NewNoneBlockCrypt(key []byte) (BlockCrypt, error) { + return new(NoneBlockCrypt), nil +} + +func (c *NoneBlockCrypt) Encrypt(dst, src []byte) {} +func (c *NoneBlockCrypt) Decrypt(dst, src []byte) {} diff --git a/transport/internet/kcp/dialer.go b/transport/internet/kcp/dialer.go index 706b2d4f6..1879570c3 100644 --- a/transport/internet/kcp/dialer.go +++ b/transport/internet/kcp/dialer.go @@ -3,13 +3,11 @@ package kcp import ( v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport/internet" - - "github.com/xtaci/kcp-go" ) func DialKCP(src v2net.Address, dest v2net.Destination) (internet.Connection, error) { - cpip, _ := kcp.NewNoneBlockCrypt(nil) - kcv, err := kcp.DialWithOptions(effectiveConfig.Fec, dest.NetAddr(), cpip) + cpip, _ := NewNoneBlockCrypt(nil) + kcv, err := DialWithOptions(dest.NetAddr(), cpip) if err != nil { return nil, err } diff --git a/transport/internet/kcp/kcp.go b/transport/internet/kcp/kcp.go new file mode 100644 index 000000000..18461dbba --- /dev/null +++ b/transport/internet/kcp/kcp.go @@ -0,0 +1,901 @@ +// Package kcp - A Fast and Reliable ARQ Protocol +// +// Acknowledgement: +// skywind3000@github for inventing the KCP protocol +// xtaci@github for translating to Golang +package kcp + +import ( + "encoding/binary" +) + +const ( + IKCP_RTO_NDL = 30 // no delay min rto + IKCP_RTO_MIN = 100 // normal min rto + IKCP_RTO_DEF = 200 + IKCP_RTO_MAX = 60000 + IKCP_CMD_PUSH = 81 // cmd: push data + IKCP_CMD_ACK = 82 // cmd: ack + IKCP_CMD_WASK = 83 // cmd: window probe (ask) + IKCP_CMD_WINS = 84 // cmd: window size (tell) + IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK + IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS + IKCP_WND_SND = 32 + IKCP_WND_RCV = 32 + IKCP_MTU_DEF = 1400 + IKCP_ACK_FAST = 3 + IKCP_INTERVAL = 100 + IKCP_OVERHEAD = 24 + IKCP_DEADLINK = 20 + IKCP_THRESH_INIT = 2 + IKCP_THRESH_MIN = 2 + IKCP_PROBE_INIT = 7000 // 7 secs to probe window size + IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window +) + +// Output is a closure which captures conn and calls conn.Write +type Output func(buf []byte, size int) + +/* encode 8 bits unsigned int */ +func ikcp_encode8u(p []byte, c byte) []byte { + p[0] = c + return p[1:] +} + +/* decode 8 bits unsigned int */ +func ikcp_decode8u(p []byte, c *byte) []byte { + *c = p[0] + return p[1:] +} + +/* encode 16 bits unsigned int (lsb) */ +func ikcp_encode16u(p []byte, w uint16) []byte { + binary.LittleEndian.PutUint16(p, w) + return p[2:] +} + +/* decode 16 bits unsigned int (lsb) */ +func ikcp_decode16u(p []byte, w *uint16) []byte { + *w = binary.LittleEndian.Uint16(p) + return p[2:] +} + +/* encode 32 bits unsigned int (lsb) */ +func ikcp_encode32u(p []byte, l uint32) []byte { + binary.LittleEndian.PutUint32(p, l) + return p[4:] +} + +/* decode 32 bits unsigned int (lsb) */ +func ikcp_decode32u(p []byte, l *uint32) []byte { + *l = binary.LittleEndian.Uint32(p) + return p[4:] +} + +func _imin_(a, b uint32) uint32 { + if a <= b { + return a + } else { + return b + } +} + +func _imax_(a, b uint32) uint32 { + if a >= b { + return a + } else { + return b + } +} + +func _ibound_(lower, middle, upper uint32) uint32 { + return _imin_(_imax_(lower, middle), upper) +} + +func _itimediff(later, earlier uint32) int32 { + return (int32)(later - earlier) +} + +// Segment defines a KCP segment +type Segment struct { + conv uint32 + cmd uint32 + frg uint32 + wnd uint32 + ts uint32 + sn uint32 + una uint32 + resendts uint32 + rto uint32 + fastack uint32 + xmit uint32 + data []byte +} + +// encode a segment into buffer +func (seg *Segment) encode(ptr []byte) []byte { + ptr = ikcp_encode32u(ptr, seg.conv) + ptr = ikcp_encode8u(ptr, uint8(seg.cmd)) + ptr = ikcp_encode8u(ptr, uint8(seg.frg)) + ptr = ikcp_encode16u(ptr, uint16(seg.wnd)) + ptr = ikcp_encode32u(ptr, seg.ts) + ptr = ikcp_encode32u(ptr, seg.sn) + ptr = ikcp_encode32u(ptr, seg.una) + ptr = ikcp_encode32u(ptr, uint32(len(seg.data))) + return ptr +} + +// NewSegment creates a KCP segment +func NewSegment(size int) *Segment { + seg := new(Segment) + seg.data = make([]byte, size) + return seg +} + +// KCP defines a single KCP connection +type KCP struct { + conv, mtu, mss, state uint32 + snd_una, snd_nxt, rcv_nxt uint32 + ts_recent, ts_lastack, ssthresh uint32 + rx_rttval, rx_srtt, rx_rto, rx_minrto uint32 + snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32 + current, interval, ts_flush, xmit uint32 + nodelay, updated uint32 + ts_probe, probe_wait uint32 + dead_link, incr uint32 + + snd_queue []Segment + rcv_queue []Segment + snd_buf []Segment + rcv_buf []Segment + + acklist []uint32 + + buffer []byte + fastresend int32 + nocwnd int32 + logmask int32 + output Output +} + +// NewKCP create a new kcp control object, 'conv' must equal in two endpoint +// from the same connection. +func NewKCP(conv uint32, output Output) *KCP { + kcp := new(KCP) + kcp.conv = conv + kcp.snd_wnd = IKCP_WND_SND + kcp.rcv_wnd = IKCP_WND_RCV + kcp.rmt_wnd = IKCP_WND_RCV + kcp.mtu = IKCP_MTU_DEF + kcp.mss = kcp.mtu - IKCP_OVERHEAD + kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3) + kcp.rx_rto = IKCP_RTO_DEF + kcp.rx_minrto = IKCP_RTO_MIN + kcp.interval = IKCP_INTERVAL + kcp.ts_flush = IKCP_INTERVAL + kcp.ssthresh = IKCP_THRESH_INIT + kcp.dead_link = IKCP_DEADLINK + kcp.output = output + return kcp +} + +// PeekSize checks the size of next message in the recv queue +func (kcp *KCP) PeekSize() (length int) { + if len(kcp.rcv_queue) == 0 { + return -1 + } + + seg := &kcp.rcv_queue[0] + if seg.frg == 0 { + return len(seg.data) + } + + if len(kcp.rcv_queue) < int(seg.frg+1) { + return -1 + } + + for k := range kcp.rcv_queue { + seg := &kcp.rcv_queue[k] + length += len(seg.data) + if seg.frg == 0 { + break + } + } + return +} + +// Recv is user/upper level recv: returns size, returns below zero for EAGAIN +func (kcp *KCP) Recv(buffer []byte) (n int) { + if len(kcp.rcv_queue) == 0 { + return -1 + } + + peeksize := kcp.PeekSize() + if peeksize < 0 { + return -2 + } + + if peeksize > len(buffer) { + return -3 + } + + var fast_recover bool + if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) { + fast_recover = true + } + + // merge fragment + count := 0 + for k := range kcp.rcv_queue { + seg := &kcp.rcv_queue[k] + copy(buffer, seg.data) + buffer = buffer[len(seg.data):] + n += len(seg.data) + count++ + if seg.frg == 0 { + break + } + } + kcp.rcv_queue = kcp.rcv_queue[count:] + + // move available data from rcv_buf -> rcv_queue + count = 0 + for k := range kcp.rcv_buf { + seg := &kcp.rcv_buf[k] + if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) { + kcp.rcv_queue = append(kcp.rcv_queue, *seg) + kcp.rcv_nxt++ + count++ + } else { + break + } + } + kcp.rcv_buf = kcp.rcv_buf[count:] + + // fast recover + if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover { + // ready to send back IKCP_CMD_WINS in ikcp_flush + // tell remote my window size + kcp.probe |= IKCP_ASK_TELL + } + return +} + +// Send is user/upper level send, returns below zero for error +func (kcp *KCP) Send(buffer []byte) int { + var count int + if len(buffer) == 0 { + return -1 + } + + if len(buffer) < int(kcp.mss) { + count = 1 + } else { + count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss) + } + + if count > 255 { + return -2 + } + + if count == 0 { + count = 1 + } + + for i := 0; i < count; i++ { + var size int + if len(buffer) > int(kcp.mss) { + size = int(kcp.mss) + } else { + size = len(buffer) + } + seg := NewSegment(size) + copy(seg.data, buffer[:size]) + seg.frg = uint32(count - i - 1) + kcp.snd_queue = append(kcp.snd_queue, *seg) + buffer = buffer[size:] + } + return 0 +} + +// https://tools.ietf.org/html/rfc6298 +func (kcp *KCP) update_ack(rtt int32) { + var rto uint32 = 0 + if kcp.rx_srtt == 0 { + kcp.rx_srtt = uint32(rtt) + kcp.rx_rttval = uint32(rtt) / 2 + } else { + delta := rtt - int32(kcp.rx_srtt) + if delta < 0 { + delta = -delta + } + kcp.rx_rttval = (3*kcp.rx_rttval + uint32(delta)) / 4 + kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8 + if kcp.rx_srtt < 1 { + kcp.rx_srtt = 1 + } + } + rto = kcp.rx_srtt + _imax_(1, 4*kcp.rx_rttval) + kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX) +} + +func (kcp *KCP) shrink_buf() { + if len(kcp.snd_buf) > 0 { + seg := &kcp.snd_buf[0] + kcp.snd_una = seg.sn + } else { + kcp.snd_una = kcp.snd_nxt + } +} + +func (kcp *KCP) parse_ack(sn uint32) { + if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 { + return + } + + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + if sn == seg.sn { + kcp.snd_buf = append(kcp.snd_buf[:k], kcp.snd_buf[k+1:]...) + break + } + if _itimediff(sn, seg.sn) < 0 { + break + } + } +} + +func (kcp *KCP) parse_fastack(sn uint32) { + if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 { + return + } + + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + if _itimediff(sn, seg.sn) < 0 { + break + } else if sn != seg.sn { + seg.fastack++ + } + } +} + +func (kcp *KCP) parse_una(una uint32) { + count := 0 + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + if _itimediff(una, seg.sn) > 0 { + count++ + } else { + break + } + } + kcp.snd_buf = kcp.snd_buf[count:] +} + +// ack append +func (kcp *KCP) ack_push(sn, ts uint32) { + kcp.acklist = append(kcp.acklist, sn, ts) +} + +func (kcp *KCP) ack_get(p int) (sn, ts uint32) { + return kcp.acklist[p*2+0], kcp.acklist[p*2+1] +} + +func (kcp *KCP) parse_data(newseg *Segment) { + sn := newseg.sn + if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 || + _itimediff(sn, kcp.rcv_nxt) < 0 { + return + } + + n := len(kcp.rcv_buf) - 1 + insert_idx := 0 + repeat := false + for i := n; i >= 0; i-- { + seg := &kcp.rcv_buf[i] + if seg.sn == sn { + repeat = true + break + } + if _itimediff(sn, seg.sn) > 0 { + insert_idx = i + 1 + break + } + } + + if !repeat { + if insert_idx == n+1 { + kcp.rcv_buf = append(kcp.rcv_buf, *newseg) + } else { + kcp.rcv_buf = append(kcp.rcv_buf, Segment{}) + copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:]) + kcp.rcv_buf[insert_idx] = *newseg + } + } + + // move available data from rcv_buf -> rcv_queue + count := 0 + for k := range kcp.rcv_buf { + seg := &kcp.rcv_buf[k] + if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) { + kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[k]) + kcp.rcv_nxt++ + count++ + } else { + break + } + } + kcp.rcv_buf = kcp.rcv_buf[count:] +} + +// Input when you received a low level packet (eg. UDP packet), call it +func (kcp *KCP) Input(data []byte) int { + una := kcp.snd_una + if len(data) < IKCP_OVERHEAD { + return -1 + } + + var maxack uint32 + var flag int + for { + var ts, sn, length, una, conv uint32 + var wnd uint16 + var cmd, frg uint8 + + if len(data) < int(IKCP_OVERHEAD) { + break + } + + data = ikcp_decode32u(data, &conv) + if conv != kcp.conv { + return -1 + } + + data = ikcp_decode8u(data, &cmd) + data = ikcp_decode8u(data, &frg) + data = ikcp_decode16u(data, &wnd) + data = ikcp_decode32u(data, &ts) + data = ikcp_decode32u(data, &sn) + data = ikcp_decode32u(data, &una) + data = ikcp_decode32u(data, &length) + if len(data) < int(length) { + return -2 + } + + if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && + cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS { + return -3 + } + + kcp.rmt_wnd = uint32(wnd) + kcp.parse_una(una) + kcp.shrink_buf() + + if cmd == IKCP_CMD_ACK { + if _itimediff(kcp.current, ts) >= 0 { + kcp.update_ack(_itimediff(kcp.current, ts)) + } + kcp.parse_ack(sn) + kcp.shrink_buf() + if flag == 0 { + flag = 1 + maxack = sn + } else if _itimediff(sn, maxack) > 0 { + maxack = sn + } + } else if cmd == IKCP_CMD_PUSH { + if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 { + kcp.ack_push(sn, ts) + if _itimediff(sn, kcp.rcv_nxt) >= 0 { + seg := NewSegment(int(length)) + seg.conv = conv + seg.cmd = uint32(cmd) + seg.frg = uint32(frg) + seg.wnd = uint32(wnd) + seg.ts = ts + seg.sn = sn + seg.una = una + copy(seg.data, data[:length]) + kcp.parse_data(seg) + } + } + } else if cmd == IKCP_CMD_WASK { + // ready to send back IKCP_CMD_WINS in Ikcp_flush + // tell remote my window size + kcp.probe |= IKCP_ASK_TELL + } else if cmd == IKCP_CMD_WINS { + // do nothing + } else { + return -3 + } + + data = data[length:] + } + + if flag != 0 { + kcp.parse_fastack(maxack) + } + + if _itimediff(kcp.snd_una, una) > 0 { + if kcp.cwnd < kcp.rmt_wnd { + mss := kcp.mss + if kcp.cwnd < kcp.ssthresh { + kcp.cwnd++ + kcp.incr += mss + } else { + if kcp.incr < mss { + kcp.incr = mss + } + kcp.incr += (mss*mss)/kcp.incr + (mss / 16) + if (kcp.cwnd+1)*mss <= kcp.incr { + kcp.cwnd++ + } + } + if kcp.cwnd > kcp.rmt_wnd { + kcp.cwnd = kcp.rmt_wnd + kcp.incr = kcp.rmt_wnd * mss + } + } + } + + return 0 +} + +func (kcp *KCP) wnd_unused() int32 { + if len(kcp.rcv_queue) < int(kcp.rcv_wnd) { + return int32(int(kcp.rcv_wnd) - len(kcp.rcv_queue)) + } + return 0 +} + +// flush pending data +func (kcp *KCP) flush() { + current := kcp.current + buffer := kcp.buffer + change := 0 + lost := false + + if kcp.updated == 0 { + return + } + var seg Segment + seg.conv = kcp.conv + seg.cmd = IKCP_CMD_ACK + seg.wnd = uint32(kcp.wnd_unused()) + seg.una = kcp.rcv_nxt + + // flush acknowledges + count := len(kcp.acklist) / 2 + ptr := buffer + for i := 0; i < count; i++ { + size := len(buffer) - len(ptr) + if size+IKCP_OVERHEAD > int(kcp.mtu) { + kcp.output(buffer, size) + ptr = buffer + } + seg.sn, seg.ts = kcp.ack_get(i) + ptr = seg.encode(ptr) + } + kcp.acklist = nil + + // probe window size (if remote window size equals zero) + if kcp.rmt_wnd == 0 { + if kcp.probe_wait == 0 { + kcp.probe_wait = IKCP_PROBE_INIT + kcp.ts_probe = kcp.current + kcp.probe_wait + } else { + if _itimediff(kcp.current, kcp.ts_probe) >= 0 { + if kcp.probe_wait < IKCP_PROBE_INIT { + kcp.probe_wait = IKCP_PROBE_INIT + } + kcp.probe_wait += kcp.probe_wait / 2 + if kcp.probe_wait > IKCP_PROBE_LIMIT { + kcp.probe_wait = IKCP_PROBE_LIMIT + } + kcp.ts_probe = kcp.current + kcp.probe_wait + kcp.probe |= IKCP_ASK_SEND + } + } + } else { + kcp.ts_probe = 0 + kcp.probe_wait = 0 + } + + // flush window probing commands + if (kcp.probe & IKCP_ASK_SEND) != 0 { + seg.cmd = IKCP_CMD_WASK + size := len(buffer) - len(ptr) + if size+IKCP_OVERHEAD > int(kcp.mtu) { + kcp.output(buffer, size) + ptr = buffer + } + ptr = seg.encode(ptr) + } + + // flush window probing commands + if (kcp.probe & IKCP_ASK_TELL) != 0 { + seg.cmd = IKCP_CMD_WINS + size := len(buffer) - len(ptr) + if size+IKCP_OVERHEAD > int(kcp.mtu) { + kcp.output(buffer, size) + ptr = buffer + } + ptr = seg.encode(ptr) + } + + kcp.probe = 0 + + // calculate window size + cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd) + if kcp.nocwnd == 0 { + cwnd = _imin_(kcp.cwnd, cwnd) + } + + count = 0 + for k := range kcp.snd_queue { + if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 { + break + } + newseg := kcp.snd_queue[k] + newseg.conv = kcp.conv + newseg.cmd = IKCP_CMD_PUSH + newseg.wnd = seg.wnd + newseg.ts = current + newseg.sn = kcp.snd_nxt + newseg.una = kcp.rcv_nxt + newseg.resendts = current + newseg.rto = kcp.rx_rto + newseg.fastack = 0 + newseg.xmit = 0 + kcp.snd_buf = append(kcp.snd_buf, newseg) + kcp.snd_nxt++ + count++ + } + kcp.snd_queue = kcp.snd_queue[count:] + + // calculate resent + resent := uint32(kcp.fastresend) + if kcp.fastresend <= 0 { + resent = 0xffffffff + } + rtomin := (kcp.rx_rto >> 3) + if kcp.nodelay != 0 { + rtomin = 0 + } + + // flush data segments + for k := range kcp.snd_buf { + segment := &kcp.snd_buf[k] + needsend := false + if segment.xmit == 0 { + needsend = true + segment.xmit++ + segment.rto = kcp.rx_rto + segment.resendts = current + segment.rto + rtomin + } else if _itimediff(current, segment.resendts) >= 0 { + needsend = true + segment.xmit++ + kcp.xmit++ + if kcp.nodelay == 0 { + segment.rto += kcp.rx_rto + } else { + segment.rto += kcp.rx_rto / 2 + } + segment.resendts = current + segment.rto + lost = true + } else if segment.fastack >= resent { + needsend = true + segment.xmit++ + segment.fastack = 0 + segment.resendts = current + segment.rto + change++ + } + + if needsend { + segment.ts = current + segment.wnd = seg.wnd + segment.una = kcp.rcv_nxt + + size := len(buffer) - len(ptr) + need := IKCP_OVERHEAD + len(segment.data) + + if size+need >= int(kcp.mtu) { + kcp.output(buffer, size) + ptr = buffer + } + + ptr = segment.encode(ptr) + copy(ptr, segment.data) + ptr = ptr[len(segment.data):] + + if segment.xmit >= kcp.dead_link { + kcp.state = 0xFFFFFFFF + } + } + } + + // flash remain segments + size := len(buffer) - len(ptr) + if size > 0 { + kcp.output(buffer, size) + } + + // update ssthresh + // rate halving, https://tools.ietf.org/html/rfc6937 + if change != 0 { + inflight := kcp.snd_nxt - kcp.snd_una + kcp.ssthresh = inflight / 2 + if kcp.ssthresh < IKCP_THRESH_MIN { + kcp.ssthresh = IKCP_THRESH_MIN + } + kcp.cwnd = kcp.ssthresh + resent + kcp.incr = kcp.cwnd * kcp.mss + } + + // congestion control, https://tools.ietf.org/html/rfc5681 + if lost { + kcp.ssthresh = cwnd / 2 + if kcp.ssthresh < IKCP_THRESH_MIN { + kcp.ssthresh = IKCP_THRESH_MIN + } + kcp.cwnd = 1 + kcp.incr = kcp.mss + } + + if kcp.cwnd < 1 { + kcp.cwnd = 1 + kcp.incr = kcp.mss + } +} + +// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask +// ikcp_check when to call it again (without ikcp_input/_send calling). +// 'current' - current timestamp in millisec. +func (kcp *KCP) Update(current uint32) { + var slap int32 + + kcp.current = current + + if kcp.updated == 0 { + kcp.updated = 1 + kcp.ts_flush = kcp.current + } + + slap = _itimediff(kcp.current, kcp.ts_flush) + + if slap >= 10000 || slap < -10000 { + kcp.ts_flush = kcp.current + slap = 0 + } + + if slap >= 0 { + kcp.ts_flush += kcp.interval + if _itimediff(kcp.current, kcp.ts_flush) >= 0 { + kcp.ts_flush = kcp.current + kcp.interval + } + kcp.flush() + } +} + +// Check determines when should you invoke ikcp_update: +// returns when you should invoke ikcp_update in millisec, if there +// is no ikcp_input/_send calling. you can call ikcp_update in that +// time, instead of call update repeatly. +// Important to reduce unnacessary ikcp_update invoking. use it to +// schedule ikcp_update (eg. implementing an epoll-like mechanism, +// or optimize ikcp_update when handling massive kcp connections) +func (kcp *KCP) Check(current uint32) uint32 { + ts_flush := kcp.ts_flush + tm_flush := int32(0x7fffffff) + tm_packet := int32(0x7fffffff) + minimal := uint32(0) + if kcp.updated == 0 { + return current + } + + if _itimediff(current, ts_flush) >= 10000 || + _itimediff(current, ts_flush) < -10000 { + ts_flush = current + } + + if _itimediff(current, ts_flush) >= 0 { + return current + } + + tm_flush = _itimediff(ts_flush, current) + + for k := range kcp.snd_buf { + seg := &kcp.snd_buf[k] + diff := _itimediff(seg.resendts, current) + if diff <= 0 { + return current + } + if diff < tm_packet { + tm_packet = diff + } + } + + minimal = uint32(tm_packet) + if tm_packet >= tm_flush { + minimal = uint32(tm_flush) + } + if minimal >= kcp.interval { + minimal = kcp.interval + } + + return current + minimal +} + +// SetMtu changes MTU size, default is 1400 +func (kcp *KCP) SetMtu(mtu int) int { + if mtu < 50 || mtu < IKCP_OVERHEAD { + return -1 + } + buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3) + if buffer == nil { + return -2 + } + kcp.mtu = uint32(mtu) + kcp.mss = kcp.mtu - IKCP_OVERHEAD + kcp.buffer = buffer + return 0 +} + +func (kcp *KCP) Interval(interval int) int { + if interval > 5000 { + interval = 5000 + } else if interval < 10 { + interval = 10 + } + kcp.interval = uint32(interval) + return 0 +} + +// NoDelay options +// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1) +// nodelay: 0:disable(default), 1:enable +// interval: internal update timer interval in millisec, default is 100ms +// resend: 0:disable fast resend(default), 1:enable fast resend +// nc: 0:normal congestion control(default), 1:disable congestion control +func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int { + if nodelay >= 0 { + kcp.nodelay = uint32(nodelay) + if nodelay != 0 { + kcp.rx_minrto = IKCP_RTO_NDL + } else { + kcp.rx_minrto = IKCP_RTO_MIN + } + } + if interval >= 0 { + if interval > 5000 { + interval = 5000 + } else if interval < 10 { + interval = 10 + } + kcp.interval = uint32(interval) + } + if resend >= 0 { + kcp.fastresend = int32(resend) + } + if nc >= 0 { + kcp.nocwnd = int32(nc) + } + return 0 +} + +// WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default +func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int { + if sndwnd > 0 { + kcp.snd_wnd = uint32(sndwnd) + } + if rcvwnd > 0 { + kcp.rcv_wnd = uint32(rcvwnd) + } + return 0 +} + +// WaitSnd gets how many packet is waiting to be sent +func (kcp *KCP) WaitSnd() int { + return len(kcp.snd_buf) + len(kcp.snd_queue) +} diff --git a/transport/internet/kcp/sess.go b/transport/internet/kcp/sess.go new file mode 100644 index 000000000..596312e66 --- /dev/null +++ b/transport/internet/kcp/sess.go @@ -0,0 +1,617 @@ +package kcp + +import ( + crand "crypto/rand" + "encoding/binary" + "errors" + "hash/crc32" + "io" + "log" + "math/rand" + "net" + "sync" + "time" + + "golang.org/x/net/ipv4" +) + +var ( + errTimeout = errors.New("i/o timeout") + errBrokenPipe = errors.New("broken pipe") +) + +const ( + basePort = 20000 // minimum port for listening + maxPort = 65535 // maximum port for listening + defaultWndSize = 128 // default window size, in packet + otpSize = 16 // magic number + crcSize = 4 // 4bytes packet checksum + cryptHeaderSize = otpSize + crcSize + connTimeout = 60 * time.Second + mtuLimit = 4096 + rxQueueLimit = 8192 + rxFecLimit = 2048 +) + +type ( + // UDPSession defines a KCP session implemented by UDP + UDPSession struct { + kcp *KCP // the core ARQ + conn *net.UDPConn // the underlying UDP socket + block BlockCrypt + needUpdate bool + l *Listener // point to server listener if it's a server socket + local, remote net.Addr + rd time.Time // read deadline + wd time.Time // write deadline + sockbuff []byte // kcp receiving is based on packet, I turn it into stream + die chan struct{} + isClosed bool + mu sync.Mutex + chReadEvent chan struct{} + chWriteEvent chan struct{} + chTicker chan time.Time + chUDPOutput chan []byte + headerSize int + lastInputTs time.Time + ackNoDelay bool + } +) + +// newUDPSession create a new udp session for client or server +func newUDPSession(conv uint32, l *Listener, conn *net.UDPConn, remote *net.UDPAddr, block BlockCrypt) *UDPSession { + sess := new(UDPSession) + sess.chTicker = make(chan time.Time, 1) + sess.chUDPOutput = make(chan []byte, rxQueueLimit) + sess.die = make(chan struct{}) + sess.local = conn.LocalAddr() + sess.chReadEvent = make(chan struct{}, 1) + sess.chWriteEvent = make(chan struct{}, 1) + sess.remote = remote + sess.conn = conn + sess.l = l + sess.block = block + sess.lastInputTs = time.Now() + + // caculate header size + if sess.block != nil { + sess.headerSize += cryptHeaderSize + } + + sess.kcp = NewKCP(conv, func(buf []byte, size int) { + if size >= IKCP_OVERHEAD { + ext := make([]byte, sess.headerSize+size) + copy(ext[sess.headerSize:], buf) + sess.chUDPOutput <- ext + } + }) + sess.kcp.WndSize(defaultWndSize, defaultWndSize) + sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize) + + go sess.updateTask() + go sess.outputTask() + if l == nil { // it's a client connection + go sess.readLoop() + } + + return sess +} + +// Read implements the Conn Read method. +func (s *UDPSession) Read(b []byte) (n int, err error) { + for { + s.mu.Lock() + if len(s.sockbuff) > 0 { // copy from buffer + n = copy(b, s.sockbuff) + s.sockbuff = s.sockbuff[n:] + s.mu.Unlock() + return n, nil + } + + if s.isClosed { + s.mu.Unlock() + return 0, errBrokenPipe + } + + if !s.rd.IsZero() { + if time.Now().After(s.rd) { // timeout + s.mu.Unlock() + return 0, errTimeout + } + } + + if n := s.kcp.PeekSize(); n > 0 { // data arrived + if len(b) >= n { + s.kcp.Recv(b) + } else { + buf := make([]byte, n) + s.kcp.Recv(buf) + n = copy(b, buf) + s.sockbuff = buf[n:] // store remaining bytes into sockbuff for next read + } + s.mu.Unlock() + return n, nil + } + + var timeout <-chan time.Time + if !s.rd.IsZero() { + delay := s.rd.Sub(time.Now()) + timeout = time.After(delay) + } + s.mu.Unlock() + + // wait for read event or timeout + select { + case <-s.chReadEvent: + case <-timeout: + case <-s.die: + } + } +} + +// Write implements the Conn Write method. +func (s *UDPSession) Write(b []byte) (n int, err error) { + for { + s.mu.Lock() + if s.isClosed { + s.mu.Unlock() + return 0, errBrokenPipe + } + + if !s.wd.IsZero() { + if time.Now().After(s.wd) { // timeout + s.mu.Unlock() + return 0, errTimeout + } + } + + if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) { + n = len(b) + max := s.kcp.mss << 8 + for { + if len(b) <= int(max) { // in most cases + s.kcp.Send(b) + break + } else { + s.kcp.Send(b[:max]) + b = b[max:] + } + } + s.kcp.current = currentMs() + s.kcp.flush() + s.mu.Unlock() + return n, nil + } + + var timeout <-chan time.Time + if !s.wd.IsZero() { + delay := s.wd.Sub(time.Now()) + timeout = time.After(delay) + } + s.mu.Unlock() + + // wait for write event or timeout + select { + case <-s.chWriteEvent: + case <-timeout: + case <-s.die: + } + } +} + +// Close closes the connection. +func (s *UDPSession) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.isClosed { + return errBrokenPipe + } + close(s.die) + s.isClosed = true + if s.l == nil { // client socket close + s.conn.Close() + } + + return nil +} + +// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it. +func (s *UDPSession) LocalAddr() net.Addr { + return s.local +} + +// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it. +func (s *UDPSession) RemoteAddr() net.Addr { return s.remote } + +// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline. +func (s *UDPSession) SetDeadline(t time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + s.rd = t + s.wd = t + return nil +} + +// SetReadDeadline implements the Conn SetReadDeadline method. +func (s *UDPSession) SetReadDeadline(t time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + s.rd = t + return nil +} + +// SetWriteDeadline implements the Conn SetWriteDeadline method. +func (s *UDPSession) SetWriteDeadline(t time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + s.wd = t + return nil +} + +// SetWindowSize set maximum window size +func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) { + s.mu.Lock() + defer s.mu.Unlock() + s.kcp.WndSize(sndwnd, rcvwnd) +} + +// SetMtu sets the maximum transmission unit +func (s *UDPSession) SetMtu(mtu int) { + s.mu.Lock() + defer s.mu.Unlock() + s.kcp.SetMtu(mtu - s.headerSize) +} + +// SetACKNoDelay changes ack flush option, set true to flush ack immediately, +func (s *UDPSession) SetACKNoDelay(nodelay bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.ackNoDelay = nodelay +} + +// SetNoDelay calls nodelay() of kcp +func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) { + s.mu.Lock() + defer s.mu.Unlock() + s.kcp.NoDelay(nodelay, interval, resend, nc) +} + +// SetDSCP sets the DSCP field of IP header +func (s *UDPSession) SetDSCP(tos int) { + s.mu.Lock() + defer s.mu.Unlock() + if err := ipv4.NewConn(s.conn).SetTOS(tos << 2); err != nil { + log.Println("set tos:", err) + } +} + +func (s *UDPSession) outputTask() { + // ping + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case ext := <-s.chUDPOutput: + if s.block != nil { + io.ReadFull(crand.Reader, ext[:otpSize]) // OTP + checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:]) + binary.LittleEndian.PutUint32(ext[otpSize:], checksum) + s.block.Encrypt(ext, ext) + } + + //if rand.Intn(100) < 80 { + n, err := s.conn.WriteTo(ext, s.remote) + if err != nil { + log.Println(err, n) + } + //} + + case <-ticker.C: + sz := rand.Intn(IKCP_MTU_DEF - s.headerSize - IKCP_OVERHEAD) + sz += s.headerSize + IKCP_OVERHEAD + ping := make([]byte, sz) + io.ReadFull(crand.Reader, ping) + if s.block != nil { + checksum := crc32.ChecksumIEEE(ping[cryptHeaderSize:]) + binary.LittleEndian.PutUint32(ping[otpSize:], checksum) + s.block.Encrypt(ping, ping) + } + + n, err := s.conn.WriteTo(ping, s.remote) + if err != nil { + log.Println(err, n) + } + case <-s.die: + return + } + } +} + +// kcp update, input loop +func (s *UDPSession) updateTask() { + var tc <-chan time.Time + if s.l == nil { // client + ticker := time.NewTicker(10 * time.Millisecond) + tc = ticker.C + defer ticker.Stop() + } else { + tc = s.chTicker + } + + var nextupdate uint32 + for { + select { + case <-tc: + s.mu.Lock() + current := currentMs() + if current >= nextupdate || s.needUpdate { + s.kcp.Update(current) + nextupdate = s.kcp.Check(current) + } + if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) { + s.notifyWriteEvent() + } + s.needUpdate = false + s.mu.Unlock() + case <-s.die: + if s.l != nil { // has listener + s.l.chDeadlinks <- s.remote + } + return + } + } +} + +// GetConv gets conversation id of a session +func (s *UDPSession) GetConv() uint32 { + return s.kcp.conv +} + +func (s *UDPSession) notifyReadEvent() { + select { + case s.chReadEvent <- struct{}{}: + default: + } +} + +func (s *UDPSession) notifyWriteEvent() { + select { + case s.chWriteEvent <- struct{}{}: + default: + } +} + +func (s *UDPSession) kcpInput(data []byte) { + now := time.Now() + if now.Sub(s.lastInputTs) > connTimeout { + s.Close() + return + } + s.lastInputTs = now + + s.mu.Lock() + s.kcp.current = currentMs() + s.kcp.Input(data) + + if s.ackNoDelay { + s.kcp.current = currentMs() + s.kcp.flush() + } else { + s.needUpdate = true + } + s.mu.Unlock() + s.notifyReadEvent() +} + +func (s *UDPSession) receiver(ch chan []byte) { + for { + data := make([]byte, mtuLimit) + if n, _, err := s.conn.ReadFromUDP(data); err == nil && n >= s.headerSize+IKCP_OVERHEAD { + ch <- data[:n] + } else if err != nil { + return + } + } +} + +// read loop for client session +func (s *UDPSession) readLoop() { + chPacket := make(chan []byte, rxQueueLimit) + go s.receiver(chPacket) + + for { + select { + case data := <-chPacket: + dataValid := false + if s.block != nil { + s.block.Decrypt(data, data) + data = data[otpSize:] + checksum := crc32.ChecksumIEEE(data[crcSize:]) + if checksum == binary.LittleEndian.Uint32(data) { + data = data[crcSize:] + dataValid = true + } + } else if s.block == nil { + dataValid = true + } + + if dataValid { + s.kcpInput(data) + } + case <-s.die: + return + } + } +} + +type ( + // Listener defines a server listening for connections + Listener struct { + block BlockCrypt + conn *net.UDPConn + sessions map[string]*UDPSession + chAccepts chan *UDPSession + chDeadlinks chan net.Addr + headerSize int + die chan struct{} + } + + packet struct { + from *net.UDPAddr + data []byte + } +) + +// monitor incoming data for all connections of server +func (l *Listener) monitor() { + chPacket := make(chan packet, rxQueueLimit) + go l.receiver(chPacket) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { + select { + case p := <-chPacket: + data := p.data + from := p.from + dataValid := false + if l.block != nil { + l.block.Decrypt(data, data) + data = data[otpSize:] + checksum := crc32.ChecksumIEEE(data[crcSize:]) + if checksum == binary.LittleEndian.Uint32(data) { + data = data[crcSize:] + dataValid = true + } + } else if l.block == nil { + dataValid = true + } + + if dataValid { + addr := from.String() + s, ok := l.sessions[addr] + if !ok { // new session + var conv uint32 + convValid := false + + conv = binary.LittleEndian.Uint32(data) + convValid = true + + if convValid { + s := newUDPSession(conv, l, l.conn, from, l.block) + s.kcpInput(data) + l.sessions[addr] = s + l.chAccepts <- s + } + } else { + s.kcpInput(data) + } + } + case deadlink := <-l.chDeadlinks: + delete(l.sessions, deadlink.String()) + case <-l.die: + return + case <-ticker.C: + now := time.Now() + for _, s := range l.sessions { + select { + case s.chTicker <- now: + default: + } + } + } + } +} + +func (l *Listener) receiver(ch chan packet) { + for { + data := make([]byte, mtuLimit) + if n, from, err := l.conn.ReadFromUDP(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD { + ch <- packet{from, data[:n]} + } else if err != nil { + return + } + } +} + +// Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn. +func (l *Listener) Accept() (*UDPSession, error) { + select { + case c := <-l.chAccepts: + return c, nil + case <-l.die: + return nil, errors.New("listener stopped") + } +} + +// Close stops listening on the UDP address. Already Accepted connections are not closed. +func (l *Listener) Close() error { + if err := l.conn.Close(); err == nil { + close(l.die) + return nil + } else { + return err + } +} + +// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it. +func (l *Listener) Addr() net.Addr { + return l.conn.LocalAddr() +} + +// Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp", +func Listen(laddr string) (*Listener, error) { + return ListenWithOptions(laddr, nil) +} + +// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption, +// FEC = 0 means no FEC, FEC > 0 means num(FEC) as a FEC cluster +func ListenWithOptions(laddr string, block BlockCrypt) (*Listener, error) { + udpaddr, err := net.ResolveUDPAddr("udp", laddr) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", udpaddr) + if err != nil { + return nil, err + } + + l := new(Listener) + l.conn = conn + l.sessions = make(map[string]*UDPSession) + l.chAccepts = make(chan *UDPSession, 1024) + l.chDeadlinks = make(chan net.Addr, 1024) + l.die = make(chan struct{}) + l.block = block + + // caculate header size + if l.block != nil { + l.headerSize += cryptHeaderSize + } + + go l.monitor() + return l, nil +} + +// Dial connects to the remote address raddr on the network "udp" +func Dial(raddr string) (*UDPSession, error) { + return DialWithOptions(raddr, nil) +} + +// DialWithOptions connects to the remote address raddr on the network "udp" with packet encryption +func DialWithOptions(raddr string, block BlockCrypt) (*UDPSession, error) { + udpaddr, err := net.ResolveUDPAddr("udp", raddr) + if err != nil { + return nil, err + } + + for { + port := basePort + rand.Int()%(maxPort-basePort) + if udpconn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port}); err == nil { + return newUDPSession(rand.Uint32(), nil, udpconn, udpaddr, block), nil + } + } +} + +func currentMs() uint32 { + return uint32(time.Now().UnixNano() / int64(time.Millisecond)) +} diff --git a/transport/internet/kcp/session.go b/transport/internet/kcp/session.go index 0b34b18e9..dccdd9a52 100644 --- a/transport/internet/kcp/session.go +++ b/transport/internet/kcp/session.go @@ -7,12 +7,10 @@ import ( v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport/internet" - - "github.com/xtaci/kcp-go" ) type KCPVlistener struct { - lst *kcp.Listener + lst *Listener previousSocketid map[int]uint32 previousSocketid_mapid int } @@ -73,7 +71,7 @@ func (kvl *KCPVlistener) Addr() net.Addr { } type KCPVconn struct { - hc *kcp.UDPSession + hc *UDPSession conntokeep time.Time } @@ -168,8 +166,8 @@ func (this *KCPVconn) SetReusable(b bool) { func ListenKCP(address v2net.Address, port v2net.Port) (internet.Listener, error) { laddr := address.String() + ":" + port.String() - crypt, _ := kcp.NewNoneBlockCrypt(nil) - kcl, err := kcp.ListenWithOptions(effectiveConfig.Fec, laddr, crypt) + crypt, _ := NewNoneBlockCrypt(nil) + kcl, err := ListenWithOptions(laddr, crypt) kcvl := &KCPVlistener{lst: kcl} return kcvl, err }