mirror of
https://github.com/v2fly/v2ray-core.git
synced 2024-12-22 10:08:15 -05:00
customizable kcp code
This commit is contained in:
parent
71ce8f4416
commit
99516bc511
@ -38,15 +38,14 @@ fast3,fast2,fast,normal
|
||||
->>>>>> less bandwich wasted
|
||||
*/
|
||||
type Config struct {
|
||||
Mode string `json:"Mode"`
|
||||
Mtu int `json:"MaximumTransmissionUnit"`
|
||||
Sndwnd int `json:"SendingWindowSize"`
|
||||
Rcvwnd int `json:"ReceivingWindowSize"`
|
||||
Fec int `json:"ForwardErrorCorrectionGroupSize"`
|
||||
Acknodelay bool `json:"AcknowledgeNoDelay"`
|
||||
Dscp int `json:"Dscp"`
|
||||
ReadTimeout int `json:"ReadTimeout"`
|
||||
WriteTimeout int `json:"WriteTimeout"`
|
||||
Mode string
|
||||
Mtu int
|
||||
Sndwnd int
|
||||
Rcvwnd int
|
||||
Acknodelay bool
|
||||
Dscp int
|
||||
ReadTimeout int
|
||||
WriteTimeout int
|
||||
}
|
||||
|
||||
func (this *Config) Apply() {
|
||||
@ -59,7 +58,6 @@ var (
|
||||
Mtu: 1350,
|
||||
Sndwnd: 1024,
|
||||
Rcvwnd: 1024,
|
||||
Fec: 4,
|
||||
Dscp: 0,
|
||||
ReadTimeout: 600,
|
||||
WriteTimeout: 500,
|
||||
|
@ -8,20 +8,45 @@ import (
|
||||
|
||||
func (this *Config) UnmarshalJSON(data []byte) error {
|
||||
type JSONConfig struct {
|
||||
Mode string `json:"Mode"`
|
||||
Mtu int `json:"MaximumTransmissionUnit"`
|
||||
Sndwnd int `json:"SendingWindowSize"`
|
||||
Rcvwnd int `json:"ReceivingWindowSize"`
|
||||
Fec int `json:"ForwardErrorCorrectionGroupSize"`
|
||||
Acknodelay bool `json:"AcknowledgeNoDelay"`
|
||||
Dscp int `json:"Dscp"`
|
||||
ReadTimeout int `json:"ReadTimeout"`
|
||||
WriteTimeout int `json:"WriteTimeout"`
|
||||
Mode *string `json:"Mode"`
|
||||
Mtu *int `json:"MaximumTransmissionUnit"`
|
||||
Sndwnd *int `json:"SendingWindowSize"`
|
||||
Rcvwnd *int `json:"ReceivingWindowSize"`
|
||||
Acknodelay *bool `json:"AcknowledgeNoDelay"`
|
||||
Dscp *int `json:"Dscp"`
|
||||
ReadTimeout *int `json:"ReadTimeout"`
|
||||
WriteTimeout *int `json:"WriteTimeout"`
|
||||
}
|
||||
jsonConfig := effectiveConfig
|
||||
jsonConfig := new(JSONConfig)
|
||||
if err := json.Unmarshal(data, &jsonConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
*this = jsonConfig
|
||||
if jsonConfig.Mode != nil {
|
||||
this.Mode = *jsonConfig.Mode
|
||||
}
|
||||
|
||||
if jsonConfig.Mtu != nil {
|
||||
this.Mtu = *jsonConfig.Mtu
|
||||
}
|
||||
|
||||
if jsonConfig.Sndwnd != nil {
|
||||
this.Sndwnd = *jsonConfig.Sndwnd
|
||||
}
|
||||
if jsonConfig.Rcvwnd != nil {
|
||||
this.Rcvwnd = *jsonConfig.Rcvwnd
|
||||
}
|
||||
if jsonConfig.Acknodelay != nil {
|
||||
this.Acknodelay = *jsonConfig.Acknodelay
|
||||
}
|
||||
if jsonConfig.Dscp != nil {
|
||||
this.Dscp = *jsonConfig.Dscp
|
||||
}
|
||||
if jsonConfig.ReadTimeout != nil {
|
||||
this.ReadTimeout = *jsonConfig.ReadTimeout
|
||||
}
|
||||
if jsonConfig.WriteTimeout != nil {
|
||||
this.WriteTimeout = *jsonConfig.WriteTimeout
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
23
transport/internet/kcp/crypt.go
Normal file
23
transport/internet/kcp/crypt.go
Normal file
@ -0,0 +1,23 @@
|
||||
package kcp
|
||||
|
||||
type BlockCrypt interface {
|
||||
// Encrypt encrypts the whole block in src into dst.
|
||||
// Dst and src may point at the same memory.
|
||||
Encrypt(dst, src []byte)
|
||||
|
||||
// Decrypt decrypts the whole block in src into dst.
|
||||
// Dst and src may point at the same memory.
|
||||
Decrypt(dst, src []byte)
|
||||
}
|
||||
|
||||
// None Encryption
|
||||
type NoneBlockCrypt struct {
|
||||
xortbl []byte
|
||||
}
|
||||
|
||||
func NewNoneBlockCrypt(key []byte) (BlockCrypt, error) {
|
||||
return new(NoneBlockCrypt), nil
|
||||
}
|
||||
|
||||
func (c *NoneBlockCrypt) Encrypt(dst, src []byte) {}
|
||||
func (c *NoneBlockCrypt) Decrypt(dst, src []byte) {}
|
@ -3,13 +3,11 @@ package kcp
|
||||
import (
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/transport/internet"
|
||||
|
||||
"github.com/xtaci/kcp-go"
|
||||
)
|
||||
|
||||
func DialKCP(src v2net.Address, dest v2net.Destination) (internet.Connection, error) {
|
||||
cpip, _ := kcp.NewNoneBlockCrypt(nil)
|
||||
kcv, err := kcp.DialWithOptions(effectiveConfig.Fec, dest.NetAddr(), cpip)
|
||||
cpip, _ := NewNoneBlockCrypt(nil)
|
||||
kcv, err := DialWithOptions(dest.NetAddr(), cpip)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
901
transport/internet/kcp/kcp.go
Normal file
901
transport/internet/kcp/kcp.go
Normal file
@ -0,0 +1,901 @@
|
||||
// Package kcp - A Fast and Reliable ARQ Protocol
|
||||
//
|
||||
// Acknowledgement:
|
||||
// skywind3000@github for inventing the KCP protocol
|
||||
// xtaci@github for translating to Golang
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
)
|
||||
|
||||
const (
|
||||
IKCP_RTO_NDL = 30 // no delay min rto
|
||||
IKCP_RTO_MIN = 100 // normal min rto
|
||||
IKCP_RTO_DEF = 200
|
||||
IKCP_RTO_MAX = 60000
|
||||
IKCP_CMD_PUSH = 81 // cmd: push data
|
||||
IKCP_CMD_ACK = 82 // cmd: ack
|
||||
IKCP_CMD_WASK = 83 // cmd: window probe (ask)
|
||||
IKCP_CMD_WINS = 84 // cmd: window size (tell)
|
||||
IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK
|
||||
IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS
|
||||
IKCP_WND_SND = 32
|
||||
IKCP_WND_RCV = 32
|
||||
IKCP_MTU_DEF = 1400
|
||||
IKCP_ACK_FAST = 3
|
||||
IKCP_INTERVAL = 100
|
||||
IKCP_OVERHEAD = 24
|
||||
IKCP_DEADLINK = 20
|
||||
IKCP_THRESH_INIT = 2
|
||||
IKCP_THRESH_MIN = 2
|
||||
IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
|
||||
IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
|
||||
)
|
||||
|
||||
// Output is a closure which captures conn and calls conn.Write
|
||||
type Output func(buf []byte, size int)
|
||||
|
||||
/* encode 8 bits unsigned int */
|
||||
func ikcp_encode8u(p []byte, c byte) []byte {
|
||||
p[0] = c
|
||||
return p[1:]
|
||||
}
|
||||
|
||||
/* decode 8 bits unsigned int */
|
||||
func ikcp_decode8u(p []byte, c *byte) []byte {
|
||||
*c = p[0]
|
||||
return p[1:]
|
||||
}
|
||||
|
||||
/* encode 16 bits unsigned int (lsb) */
|
||||
func ikcp_encode16u(p []byte, w uint16) []byte {
|
||||
binary.LittleEndian.PutUint16(p, w)
|
||||
return p[2:]
|
||||
}
|
||||
|
||||
/* decode 16 bits unsigned int (lsb) */
|
||||
func ikcp_decode16u(p []byte, w *uint16) []byte {
|
||||
*w = binary.LittleEndian.Uint16(p)
|
||||
return p[2:]
|
||||
}
|
||||
|
||||
/* encode 32 bits unsigned int (lsb) */
|
||||
func ikcp_encode32u(p []byte, l uint32) []byte {
|
||||
binary.LittleEndian.PutUint32(p, l)
|
||||
return p[4:]
|
||||
}
|
||||
|
||||
/* decode 32 bits unsigned int (lsb) */
|
||||
func ikcp_decode32u(p []byte, l *uint32) []byte {
|
||||
*l = binary.LittleEndian.Uint32(p)
|
||||
return p[4:]
|
||||
}
|
||||
|
||||
func _imin_(a, b uint32) uint32 {
|
||||
if a <= b {
|
||||
return a
|
||||
} else {
|
||||
return b
|
||||
}
|
||||
}
|
||||
|
||||
func _imax_(a, b uint32) uint32 {
|
||||
if a >= b {
|
||||
return a
|
||||
} else {
|
||||
return b
|
||||
}
|
||||
}
|
||||
|
||||
func _ibound_(lower, middle, upper uint32) uint32 {
|
||||
return _imin_(_imax_(lower, middle), upper)
|
||||
}
|
||||
|
||||
func _itimediff(later, earlier uint32) int32 {
|
||||
return (int32)(later - earlier)
|
||||
}
|
||||
|
||||
// Segment defines a KCP segment
|
||||
type Segment struct {
|
||||
conv uint32
|
||||
cmd uint32
|
||||
frg uint32
|
||||
wnd uint32
|
||||
ts uint32
|
||||
sn uint32
|
||||
una uint32
|
||||
resendts uint32
|
||||
rto uint32
|
||||
fastack uint32
|
||||
xmit uint32
|
||||
data []byte
|
||||
}
|
||||
|
||||
// encode a segment into buffer
|
||||
func (seg *Segment) encode(ptr []byte) []byte {
|
||||
ptr = ikcp_encode32u(ptr, seg.conv)
|
||||
ptr = ikcp_encode8u(ptr, uint8(seg.cmd))
|
||||
ptr = ikcp_encode8u(ptr, uint8(seg.frg))
|
||||
ptr = ikcp_encode16u(ptr, uint16(seg.wnd))
|
||||
ptr = ikcp_encode32u(ptr, seg.ts)
|
||||
ptr = ikcp_encode32u(ptr, seg.sn)
|
||||
ptr = ikcp_encode32u(ptr, seg.una)
|
||||
ptr = ikcp_encode32u(ptr, uint32(len(seg.data)))
|
||||
return ptr
|
||||
}
|
||||
|
||||
// NewSegment creates a KCP segment
|
||||
func NewSegment(size int) *Segment {
|
||||
seg := new(Segment)
|
||||
seg.data = make([]byte, size)
|
||||
return seg
|
||||
}
|
||||
|
||||
// KCP defines a single KCP connection
|
||||
type KCP struct {
|
||||
conv, mtu, mss, state uint32
|
||||
snd_una, snd_nxt, rcv_nxt uint32
|
||||
ts_recent, ts_lastack, ssthresh uint32
|
||||
rx_rttval, rx_srtt, rx_rto, rx_minrto uint32
|
||||
snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
|
||||
current, interval, ts_flush, xmit uint32
|
||||
nodelay, updated uint32
|
||||
ts_probe, probe_wait uint32
|
||||
dead_link, incr uint32
|
||||
|
||||
snd_queue []Segment
|
||||
rcv_queue []Segment
|
||||
snd_buf []Segment
|
||||
rcv_buf []Segment
|
||||
|
||||
acklist []uint32
|
||||
|
||||
buffer []byte
|
||||
fastresend int32
|
||||
nocwnd int32
|
||||
logmask int32
|
||||
output Output
|
||||
}
|
||||
|
||||
// NewKCP create a new kcp control object, 'conv' must equal in two endpoint
|
||||
// from the same connection.
|
||||
func NewKCP(conv uint32, output Output) *KCP {
|
||||
kcp := new(KCP)
|
||||
kcp.conv = conv
|
||||
kcp.snd_wnd = IKCP_WND_SND
|
||||
kcp.rcv_wnd = IKCP_WND_RCV
|
||||
kcp.rmt_wnd = IKCP_WND_RCV
|
||||
kcp.mtu = IKCP_MTU_DEF
|
||||
kcp.mss = kcp.mtu - IKCP_OVERHEAD
|
||||
kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3)
|
||||
kcp.rx_rto = IKCP_RTO_DEF
|
||||
kcp.rx_minrto = IKCP_RTO_MIN
|
||||
kcp.interval = IKCP_INTERVAL
|
||||
kcp.ts_flush = IKCP_INTERVAL
|
||||
kcp.ssthresh = IKCP_THRESH_INIT
|
||||
kcp.dead_link = IKCP_DEADLINK
|
||||
kcp.output = output
|
||||
return kcp
|
||||
}
|
||||
|
||||
// PeekSize checks the size of next message in the recv queue
|
||||
func (kcp *KCP) PeekSize() (length int) {
|
||||
if len(kcp.rcv_queue) == 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
seg := &kcp.rcv_queue[0]
|
||||
if seg.frg == 0 {
|
||||
return len(seg.data)
|
||||
}
|
||||
|
||||
if len(kcp.rcv_queue) < int(seg.frg+1) {
|
||||
return -1
|
||||
}
|
||||
|
||||
for k := range kcp.rcv_queue {
|
||||
seg := &kcp.rcv_queue[k]
|
||||
length += len(seg.data)
|
||||
if seg.frg == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Recv is user/upper level recv: returns size, returns below zero for EAGAIN
|
||||
func (kcp *KCP) Recv(buffer []byte) (n int) {
|
||||
if len(kcp.rcv_queue) == 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
peeksize := kcp.PeekSize()
|
||||
if peeksize < 0 {
|
||||
return -2
|
||||
}
|
||||
|
||||
if peeksize > len(buffer) {
|
||||
return -3
|
||||
}
|
||||
|
||||
var fast_recover bool
|
||||
if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
|
||||
fast_recover = true
|
||||
}
|
||||
|
||||
// merge fragment
|
||||
count := 0
|
||||
for k := range kcp.rcv_queue {
|
||||
seg := &kcp.rcv_queue[k]
|
||||
copy(buffer, seg.data)
|
||||
buffer = buffer[len(seg.data):]
|
||||
n += len(seg.data)
|
||||
count++
|
||||
if seg.frg == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
kcp.rcv_queue = kcp.rcv_queue[count:]
|
||||
|
||||
// move available data from rcv_buf -> rcv_queue
|
||||
count = 0
|
||||
for k := range kcp.rcv_buf {
|
||||
seg := &kcp.rcv_buf[k]
|
||||
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
|
||||
kcp.rcv_queue = append(kcp.rcv_queue, *seg)
|
||||
kcp.rcv_nxt++
|
||||
count++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
kcp.rcv_buf = kcp.rcv_buf[count:]
|
||||
|
||||
// fast recover
|
||||
if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
|
||||
// ready to send back IKCP_CMD_WINS in ikcp_flush
|
||||
// tell remote my window size
|
||||
kcp.probe |= IKCP_ASK_TELL
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Send is user/upper level send, returns below zero for error
|
||||
func (kcp *KCP) Send(buffer []byte) int {
|
||||
var count int
|
||||
if len(buffer) == 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
if len(buffer) < int(kcp.mss) {
|
||||
count = 1
|
||||
} else {
|
||||
count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
|
||||
}
|
||||
|
||||
if count > 255 {
|
||||
return -2
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
count = 1
|
||||
}
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
var size int
|
||||
if len(buffer) > int(kcp.mss) {
|
||||
size = int(kcp.mss)
|
||||
} else {
|
||||
size = len(buffer)
|
||||
}
|
||||
seg := NewSegment(size)
|
||||
copy(seg.data, buffer[:size])
|
||||
seg.frg = uint32(count - i - 1)
|
||||
kcp.snd_queue = append(kcp.snd_queue, *seg)
|
||||
buffer = buffer[size:]
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// https://tools.ietf.org/html/rfc6298
|
||||
func (kcp *KCP) update_ack(rtt int32) {
|
||||
var rto uint32 = 0
|
||||
if kcp.rx_srtt == 0 {
|
||||
kcp.rx_srtt = uint32(rtt)
|
||||
kcp.rx_rttval = uint32(rtt) / 2
|
||||
} else {
|
||||
delta := rtt - int32(kcp.rx_srtt)
|
||||
if delta < 0 {
|
||||
delta = -delta
|
||||
}
|
||||
kcp.rx_rttval = (3*kcp.rx_rttval + uint32(delta)) / 4
|
||||
kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
|
||||
if kcp.rx_srtt < 1 {
|
||||
kcp.rx_srtt = 1
|
||||
}
|
||||
}
|
||||
rto = kcp.rx_srtt + _imax_(1, 4*kcp.rx_rttval)
|
||||
kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
|
||||
}
|
||||
|
||||
func (kcp *KCP) shrink_buf() {
|
||||
if len(kcp.snd_buf) > 0 {
|
||||
seg := &kcp.snd_buf[0]
|
||||
kcp.snd_una = seg.sn
|
||||
} else {
|
||||
kcp.snd_una = kcp.snd_nxt
|
||||
}
|
||||
}
|
||||
|
||||
func (kcp *KCP) parse_ack(sn uint32) {
|
||||
if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for k := range kcp.snd_buf {
|
||||
seg := &kcp.snd_buf[k]
|
||||
if sn == seg.sn {
|
||||
kcp.snd_buf = append(kcp.snd_buf[:k], kcp.snd_buf[k+1:]...)
|
||||
break
|
||||
}
|
||||
if _itimediff(sn, seg.sn) < 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (kcp *KCP) parse_fastack(sn uint32) {
|
||||
if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for k := range kcp.snd_buf {
|
||||
seg := &kcp.snd_buf[k]
|
||||
if _itimediff(sn, seg.sn) < 0 {
|
||||
break
|
||||
} else if sn != seg.sn {
|
||||
seg.fastack++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (kcp *KCP) parse_una(una uint32) {
|
||||
count := 0
|
||||
for k := range kcp.snd_buf {
|
||||
seg := &kcp.snd_buf[k]
|
||||
if _itimediff(una, seg.sn) > 0 {
|
||||
count++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
kcp.snd_buf = kcp.snd_buf[count:]
|
||||
}
|
||||
|
||||
// ack append
|
||||
func (kcp *KCP) ack_push(sn, ts uint32) {
|
||||
kcp.acklist = append(kcp.acklist, sn, ts)
|
||||
}
|
||||
|
||||
func (kcp *KCP) ack_get(p int) (sn, ts uint32) {
|
||||
return kcp.acklist[p*2+0], kcp.acklist[p*2+1]
|
||||
}
|
||||
|
||||
func (kcp *KCP) parse_data(newseg *Segment) {
|
||||
sn := newseg.sn
|
||||
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
|
||||
_itimediff(sn, kcp.rcv_nxt) < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
n := len(kcp.rcv_buf) - 1
|
||||
insert_idx := 0
|
||||
repeat := false
|
||||
for i := n; i >= 0; i-- {
|
||||
seg := &kcp.rcv_buf[i]
|
||||
if seg.sn == sn {
|
||||
repeat = true
|
||||
break
|
||||
}
|
||||
if _itimediff(sn, seg.sn) > 0 {
|
||||
insert_idx = i + 1
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !repeat {
|
||||
if insert_idx == n+1 {
|
||||
kcp.rcv_buf = append(kcp.rcv_buf, *newseg)
|
||||
} else {
|
||||
kcp.rcv_buf = append(kcp.rcv_buf, Segment{})
|
||||
copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
|
||||
kcp.rcv_buf[insert_idx] = *newseg
|
||||
}
|
||||
}
|
||||
|
||||
// move available data from rcv_buf -> rcv_queue
|
||||
count := 0
|
||||
for k := range kcp.rcv_buf {
|
||||
seg := &kcp.rcv_buf[k]
|
||||
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
|
||||
kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[k])
|
||||
kcp.rcv_nxt++
|
||||
count++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
kcp.rcv_buf = kcp.rcv_buf[count:]
|
||||
}
|
||||
|
||||
// Input when you received a low level packet (eg. UDP packet), call it
|
||||
func (kcp *KCP) Input(data []byte) int {
|
||||
una := kcp.snd_una
|
||||
if len(data) < IKCP_OVERHEAD {
|
||||
return -1
|
||||
}
|
||||
|
||||
var maxack uint32
|
||||
var flag int
|
||||
for {
|
||||
var ts, sn, length, una, conv uint32
|
||||
var wnd uint16
|
||||
var cmd, frg uint8
|
||||
|
||||
if len(data) < int(IKCP_OVERHEAD) {
|
||||
break
|
||||
}
|
||||
|
||||
data = ikcp_decode32u(data, &conv)
|
||||
if conv != kcp.conv {
|
||||
return -1
|
||||
}
|
||||
|
||||
data = ikcp_decode8u(data, &cmd)
|
||||
data = ikcp_decode8u(data, &frg)
|
||||
data = ikcp_decode16u(data, &wnd)
|
||||
data = ikcp_decode32u(data, &ts)
|
||||
data = ikcp_decode32u(data, &sn)
|
||||
data = ikcp_decode32u(data, &una)
|
||||
data = ikcp_decode32u(data, &length)
|
||||
if len(data) < int(length) {
|
||||
return -2
|
||||
}
|
||||
|
||||
if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
|
||||
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
|
||||
return -3
|
||||
}
|
||||
|
||||
kcp.rmt_wnd = uint32(wnd)
|
||||
kcp.parse_una(una)
|
||||
kcp.shrink_buf()
|
||||
|
||||
if cmd == IKCP_CMD_ACK {
|
||||
if _itimediff(kcp.current, ts) >= 0 {
|
||||
kcp.update_ack(_itimediff(kcp.current, ts))
|
||||
}
|
||||
kcp.parse_ack(sn)
|
||||
kcp.shrink_buf()
|
||||
if flag == 0 {
|
||||
flag = 1
|
||||
maxack = sn
|
||||
} else if _itimediff(sn, maxack) > 0 {
|
||||
maxack = sn
|
||||
}
|
||||
} else if cmd == IKCP_CMD_PUSH {
|
||||
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
|
||||
kcp.ack_push(sn, ts)
|
||||
if _itimediff(sn, kcp.rcv_nxt) >= 0 {
|
||||
seg := NewSegment(int(length))
|
||||
seg.conv = conv
|
||||
seg.cmd = uint32(cmd)
|
||||
seg.frg = uint32(frg)
|
||||
seg.wnd = uint32(wnd)
|
||||
seg.ts = ts
|
||||
seg.sn = sn
|
||||
seg.una = una
|
||||
copy(seg.data, data[:length])
|
||||
kcp.parse_data(seg)
|
||||
}
|
||||
}
|
||||
} else if cmd == IKCP_CMD_WASK {
|
||||
// ready to send back IKCP_CMD_WINS in Ikcp_flush
|
||||
// tell remote my window size
|
||||
kcp.probe |= IKCP_ASK_TELL
|
||||
} else if cmd == IKCP_CMD_WINS {
|
||||
// do nothing
|
||||
} else {
|
||||
return -3
|
||||
}
|
||||
|
||||
data = data[length:]
|
||||
}
|
||||
|
||||
if flag != 0 {
|
||||
kcp.parse_fastack(maxack)
|
||||
}
|
||||
|
||||
if _itimediff(kcp.snd_una, una) > 0 {
|
||||
if kcp.cwnd < kcp.rmt_wnd {
|
||||
mss := kcp.mss
|
||||
if kcp.cwnd < kcp.ssthresh {
|
||||
kcp.cwnd++
|
||||
kcp.incr += mss
|
||||
} else {
|
||||
if kcp.incr < mss {
|
||||
kcp.incr = mss
|
||||
}
|
||||
kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
|
||||
if (kcp.cwnd+1)*mss <= kcp.incr {
|
||||
kcp.cwnd++
|
||||
}
|
||||
}
|
||||
if kcp.cwnd > kcp.rmt_wnd {
|
||||
kcp.cwnd = kcp.rmt_wnd
|
||||
kcp.incr = kcp.rmt_wnd * mss
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (kcp *KCP) wnd_unused() int32 {
|
||||
if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
|
||||
return int32(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// flush pending data
|
||||
func (kcp *KCP) flush() {
|
||||
current := kcp.current
|
||||
buffer := kcp.buffer
|
||||
change := 0
|
||||
lost := false
|
||||
|
||||
if kcp.updated == 0 {
|
||||
return
|
||||
}
|
||||
var seg Segment
|
||||
seg.conv = kcp.conv
|
||||
seg.cmd = IKCP_CMD_ACK
|
||||
seg.wnd = uint32(kcp.wnd_unused())
|
||||
seg.una = kcp.rcv_nxt
|
||||
|
||||
// flush acknowledges
|
||||
count := len(kcp.acklist) / 2
|
||||
ptr := buffer
|
||||
for i := 0; i < count; i++ {
|
||||
size := len(buffer) - len(ptr)
|
||||
if size+IKCP_OVERHEAD > int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
}
|
||||
seg.sn, seg.ts = kcp.ack_get(i)
|
||||
ptr = seg.encode(ptr)
|
||||
}
|
||||
kcp.acklist = nil
|
||||
|
||||
// probe window size (if remote window size equals zero)
|
||||
if kcp.rmt_wnd == 0 {
|
||||
if kcp.probe_wait == 0 {
|
||||
kcp.probe_wait = IKCP_PROBE_INIT
|
||||
kcp.ts_probe = kcp.current + kcp.probe_wait
|
||||
} else {
|
||||
if _itimediff(kcp.current, kcp.ts_probe) >= 0 {
|
||||
if kcp.probe_wait < IKCP_PROBE_INIT {
|
||||
kcp.probe_wait = IKCP_PROBE_INIT
|
||||
}
|
||||
kcp.probe_wait += kcp.probe_wait / 2
|
||||
if kcp.probe_wait > IKCP_PROBE_LIMIT {
|
||||
kcp.probe_wait = IKCP_PROBE_LIMIT
|
||||
}
|
||||
kcp.ts_probe = kcp.current + kcp.probe_wait
|
||||
kcp.probe |= IKCP_ASK_SEND
|
||||
}
|
||||
}
|
||||
} else {
|
||||
kcp.ts_probe = 0
|
||||
kcp.probe_wait = 0
|
||||
}
|
||||
|
||||
// flush window probing commands
|
||||
if (kcp.probe & IKCP_ASK_SEND) != 0 {
|
||||
seg.cmd = IKCP_CMD_WASK
|
||||
size := len(buffer) - len(ptr)
|
||||
if size+IKCP_OVERHEAD > int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
}
|
||||
ptr = seg.encode(ptr)
|
||||
}
|
||||
|
||||
// flush window probing commands
|
||||
if (kcp.probe & IKCP_ASK_TELL) != 0 {
|
||||
seg.cmd = IKCP_CMD_WINS
|
||||
size := len(buffer) - len(ptr)
|
||||
if size+IKCP_OVERHEAD > int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
}
|
||||
ptr = seg.encode(ptr)
|
||||
}
|
||||
|
||||
kcp.probe = 0
|
||||
|
||||
// calculate window size
|
||||
cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
|
||||
if kcp.nocwnd == 0 {
|
||||
cwnd = _imin_(kcp.cwnd, cwnd)
|
||||
}
|
||||
|
||||
count = 0
|
||||
for k := range kcp.snd_queue {
|
||||
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
|
||||
break
|
||||
}
|
||||
newseg := kcp.snd_queue[k]
|
||||
newseg.conv = kcp.conv
|
||||
newseg.cmd = IKCP_CMD_PUSH
|
||||
newseg.wnd = seg.wnd
|
||||
newseg.ts = current
|
||||
newseg.sn = kcp.snd_nxt
|
||||
newseg.una = kcp.rcv_nxt
|
||||
newseg.resendts = current
|
||||
newseg.rto = kcp.rx_rto
|
||||
newseg.fastack = 0
|
||||
newseg.xmit = 0
|
||||
kcp.snd_buf = append(kcp.snd_buf, newseg)
|
||||
kcp.snd_nxt++
|
||||
count++
|
||||
}
|
||||
kcp.snd_queue = kcp.snd_queue[count:]
|
||||
|
||||
// calculate resent
|
||||
resent := uint32(kcp.fastresend)
|
||||
if kcp.fastresend <= 0 {
|
||||
resent = 0xffffffff
|
||||
}
|
||||
rtomin := (kcp.rx_rto >> 3)
|
||||
if kcp.nodelay != 0 {
|
||||
rtomin = 0
|
||||
}
|
||||
|
||||
// flush data segments
|
||||
for k := range kcp.snd_buf {
|
||||
segment := &kcp.snd_buf[k]
|
||||
needsend := false
|
||||
if segment.xmit == 0 {
|
||||
needsend = true
|
||||
segment.xmit++
|
||||
segment.rto = kcp.rx_rto
|
||||
segment.resendts = current + segment.rto + rtomin
|
||||
} else if _itimediff(current, segment.resendts) >= 0 {
|
||||
needsend = true
|
||||
segment.xmit++
|
||||
kcp.xmit++
|
||||
if kcp.nodelay == 0 {
|
||||
segment.rto += kcp.rx_rto
|
||||
} else {
|
||||
segment.rto += kcp.rx_rto / 2
|
||||
}
|
||||
segment.resendts = current + segment.rto
|
||||
lost = true
|
||||
} else if segment.fastack >= resent {
|
||||
needsend = true
|
||||
segment.xmit++
|
||||
segment.fastack = 0
|
||||
segment.resendts = current + segment.rto
|
||||
change++
|
||||
}
|
||||
|
||||
if needsend {
|
||||
segment.ts = current
|
||||
segment.wnd = seg.wnd
|
||||
segment.una = kcp.rcv_nxt
|
||||
|
||||
size := len(buffer) - len(ptr)
|
||||
need := IKCP_OVERHEAD + len(segment.data)
|
||||
|
||||
if size+need >= int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
}
|
||||
|
||||
ptr = segment.encode(ptr)
|
||||
copy(ptr, segment.data)
|
||||
ptr = ptr[len(segment.data):]
|
||||
|
||||
if segment.xmit >= kcp.dead_link {
|
||||
kcp.state = 0xFFFFFFFF
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flash remain segments
|
||||
size := len(buffer) - len(ptr)
|
||||
if size > 0 {
|
||||
kcp.output(buffer, size)
|
||||
}
|
||||
|
||||
// update ssthresh
|
||||
// rate halving, https://tools.ietf.org/html/rfc6937
|
||||
if change != 0 {
|
||||
inflight := kcp.snd_nxt - kcp.snd_una
|
||||
kcp.ssthresh = inflight / 2
|
||||
if kcp.ssthresh < IKCP_THRESH_MIN {
|
||||
kcp.ssthresh = IKCP_THRESH_MIN
|
||||
}
|
||||
kcp.cwnd = kcp.ssthresh + resent
|
||||
kcp.incr = kcp.cwnd * kcp.mss
|
||||
}
|
||||
|
||||
// congestion control, https://tools.ietf.org/html/rfc5681
|
||||
if lost {
|
||||
kcp.ssthresh = cwnd / 2
|
||||
if kcp.ssthresh < IKCP_THRESH_MIN {
|
||||
kcp.ssthresh = IKCP_THRESH_MIN
|
||||
}
|
||||
kcp.cwnd = 1
|
||||
kcp.incr = kcp.mss
|
||||
}
|
||||
|
||||
if kcp.cwnd < 1 {
|
||||
kcp.cwnd = 1
|
||||
kcp.incr = kcp.mss
|
||||
}
|
||||
}
|
||||
|
||||
// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
|
||||
// ikcp_check when to call it again (without ikcp_input/_send calling).
|
||||
// 'current' - current timestamp in millisec.
|
||||
func (kcp *KCP) Update(current uint32) {
|
||||
var slap int32
|
||||
|
||||
kcp.current = current
|
||||
|
||||
if kcp.updated == 0 {
|
||||
kcp.updated = 1
|
||||
kcp.ts_flush = kcp.current
|
||||
}
|
||||
|
||||
slap = _itimediff(kcp.current, kcp.ts_flush)
|
||||
|
||||
if slap >= 10000 || slap < -10000 {
|
||||
kcp.ts_flush = kcp.current
|
||||
slap = 0
|
||||
}
|
||||
|
||||
if slap >= 0 {
|
||||
kcp.ts_flush += kcp.interval
|
||||
if _itimediff(kcp.current, kcp.ts_flush) >= 0 {
|
||||
kcp.ts_flush = kcp.current + kcp.interval
|
||||
}
|
||||
kcp.flush()
|
||||
}
|
||||
}
|
||||
|
||||
// Check determines when should you invoke ikcp_update:
|
||||
// returns when you should invoke ikcp_update in millisec, if there
|
||||
// is no ikcp_input/_send calling. you can call ikcp_update in that
|
||||
// time, instead of call update repeatly.
|
||||
// Important to reduce unnacessary ikcp_update invoking. use it to
|
||||
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
|
||||
// or optimize ikcp_update when handling massive kcp connections)
|
||||
func (kcp *KCP) Check(current uint32) uint32 {
|
||||
ts_flush := kcp.ts_flush
|
||||
tm_flush := int32(0x7fffffff)
|
||||
tm_packet := int32(0x7fffffff)
|
||||
minimal := uint32(0)
|
||||
if kcp.updated == 0 {
|
||||
return current
|
||||
}
|
||||
|
||||
if _itimediff(current, ts_flush) >= 10000 ||
|
||||
_itimediff(current, ts_flush) < -10000 {
|
||||
ts_flush = current
|
||||
}
|
||||
|
||||
if _itimediff(current, ts_flush) >= 0 {
|
||||
return current
|
||||
}
|
||||
|
||||
tm_flush = _itimediff(ts_flush, current)
|
||||
|
||||
for k := range kcp.snd_buf {
|
||||
seg := &kcp.snd_buf[k]
|
||||
diff := _itimediff(seg.resendts, current)
|
||||
if diff <= 0 {
|
||||
return current
|
||||
}
|
||||
if diff < tm_packet {
|
||||
tm_packet = diff
|
||||
}
|
||||
}
|
||||
|
||||
minimal = uint32(tm_packet)
|
||||
if tm_packet >= tm_flush {
|
||||
minimal = uint32(tm_flush)
|
||||
}
|
||||
if minimal >= kcp.interval {
|
||||
minimal = kcp.interval
|
||||
}
|
||||
|
||||
return current + minimal
|
||||
}
|
||||
|
||||
// SetMtu changes MTU size, default is 1400
|
||||
func (kcp *KCP) SetMtu(mtu int) int {
|
||||
if mtu < 50 || mtu < IKCP_OVERHEAD {
|
||||
return -1
|
||||
}
|
||||
buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3)
|
||||
if buffer == nil {
|
||||
return -2
|
||||
}
|
||||
kcp.mtu = uint32(mtu)
|
||||
kcp.mss = kcp.mtu - IKCP_OVERHEAD
|
||||
kcp.buffer = buffer
|
||||
return 0
|
||||
}
|
||||
|
||||
func (kcp *KCP) Interval(interval int) int {
|
||||
if interval > 5000 {
|
||||
interval = 5000
|
||||
} else if interval < 10 {
|
||||
interval = 10
|
||||
}
|
||||
kcp.interval = uint32(interval)
|
||||
return 0
|
||||
}
|
||||
|
||||
// NoDelay options
|
||||
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
|
||||
// nodelay: 0:disable(default), 1:enable
|
||||
// interval: internal update timer interval in millisec, default is 100ms
|
||||
// resend: 0:disable fast resend(default), 1:enable fast resend
|
||||
// nc: 0:normal congestion control(default), 1:disable congestion control
|
||||
func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int {
|
||||
if nodelay >= 0 {
|
||||
kcp.nodelay = uint32(nodelay)
|
||||
if nodelay != 0 {
|
||||
kcp.rx_minrto = IKCP_RTO_NDL
|
||||
} else {
|
||||
kcp.rx_minrto = IKCP_RTO_MIN
|
||||
}
|
||||
}
|
||||
if interval >= 0 {
|
||||
if interval > 5000 {
|
||||
interval = 5000
|
||||
} else if interval < 10 {
|
||||
interval = 10
|
||||
}
|
||||
kcp.interval = uint32(interval)
|
||||
}
|
||||
if resend >= 0 {
|
||||
kcp.fastresend = int32(resend)
|
||||
}
|
||||
if nc >= 0 {
|
||||
kcp.nocwnd = int32(nc)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default
|
||||
func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
|
||||
if sndwnd > 0 {
|
||||
kcp.snd_wnd = uint32(sndwnd)
|
||||
}
|
||||
if rcvwnd > 0 {
|
||||
kcp.rcv_wnd = uint32(rcvwnd)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// WaitSnd gets how many packet is waiting to be sent
|
||||
func (kcp *KCP) WaitSnd() int {
|
||||
return len(kcp.snd_buf) + len(kcp.snd_queue)
|
||||
}
|
617
transport/internet/kcp/sess.go
Normal file
617
transport/internet/kcp/sess.go
Normal file
@ -0,0 +1,617 @@
|
||||
package kcp
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/ipv4"
|
||||
)
|
||||
|
||||
var (
|
||||
errTimeout = errors.New("i/o timeout")
|
||||
errBrokenPipe = errors.New("broken pipe")
|
||||
)
|
||||
|
||||
const (
|
||||
basePort = 20000 // minimum port for listening
|
||||
maxPort = 65535 // maximum port for listening
|
||||
defaultWndSize = 128 // default window size, in packet
|
||||
otpSize = 16 // magic number
|
||||
crcSize = 4 // 4bytes packet checksum
|
||||
cryptHeaderSize = otpSize + crcSize
|
||||
connTimeout = 60 * time.Second
|
||||
mtuLimit = 4096
|
||||
rxQueueLimit = 8192
|
||||
rxFecLimit = 2048
|
||||
)
|
||||
|
||||
type (
|
||||
// UDPSession defines a KCP session implemented by UDP
|
||||
UDPSession struct {
|
||||
kcp *KCP // the core ARQ
|
||||
conn *net.UDPConn // the underlying UDP socket
|
||||
block BlockCrypt
|
||||
needUpdate bool
|
||||
l *Listener // point to server listener if it's a server socket
|
||||
local, remote net.Addr
|
||||
rd time.Time // read deadline
|
||||
wd time.Time // write deadline
|
||||
sockbuff []byte // kcp receiving is based on packet, I turn it into stream
|
||||
die chan struct{}
|
||||
isClosed bool
|
||||
mu sync.Mutex
|
||||
chReadEvent chan struct{}
|
||||
chWriteEvent chan struct{}
|
||||
chTicker chan time.Time
|
||||
chUDPOutput chan []byte
|
||||
headerSize int
|
||||
lastInputTs time.Time
|
||||
ackNoDelay bool
|
||||
}
|
||||
)
|
||||
|
||||
// newUDPSession create a new udp session for client or server
|
||||
func newUDPSession(conv uint32, l *Listener, conn *net.UDPConn, remote *net.UDPAddr, block BlockCrypt) *UDPSession {
|
||||
sess := new(UDPSession)
|
||||
sess.chTicker = make(chan time.Time, 1)
|
||||
sess.chUDPOutput = make(chan []byte, rxQueueLimit)
|
||||
sess.die = make(chan struct{})
|
||||
sess.local = conn.LocalAddr()
|
||||
sess.chReadEvent = make(chan struct{}, 1)
|
||||
sess.chWriteEvent = make(chan struct{}, 1)
|
||||
sess.remote = remote
|
||||
sess.conn = conn
|
||||
sess.l = l
|
||||
sess.block = block
|
||||
sess.lastInputTs = time.Now()
|
||||
|
||||
// caculate header size
|
||||
if sess.block != nil {
|
||||
sess.headerSize += cryptHeaderSize
|
||||
}
|
||||
|
||||
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
|
||||
if size >= IKCP_OVERHEAD {
|
||||
ext := make([]byte, sess.headerSize+size)
|
||||
copy(ext[sess.headerSize:], buf)
|
||||
sess.chUDPOutput <- ext
|
||||
}
|
||||
})
|
||||
sess.kcp.WndSize(defaultWndSize, defaultWndSize)
|
||||
sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
|
||||
|
||||
go sess.updateTask()
|
||||
go sess.outputTask()
|
||||
if l == nil { // it's a client connection
|
||||
go sess.readLoop()
|
||||
}
|
||||
|
||||
return sess
|
||||
}
|
||||
|
||||
// Read implements the Conn Read method.
|
||||
func (s *UDPSession) Read(b []byte) (n int, err error) {
|
||||
for {
|
||||
s.mu.Lock()
|
||||
if len(s.sockbuff) > 0 { // copy from buffer
|
||||
n = copy(b, s.sockbuff)
|
||||
s.sockbuff = s.sockbuff[n:]
|
||||
s.mu.Unlock()
|
||||
return n, nil
|
||||
}
|
||||
|
||||
if s.isClosed {
|
||||
s.mu.Unlock()
|
||||
return 0, errBrokenPipe
|
||||
}
|
||||
|
||||
if !s.rd.IsZero() {
|
||||
if time.Now().After(s.rd) { // timeout
|
||||
s.mu.Unlock()
|
||||
return 0, errTimeout
|
||||
}
|
||||
}
|
||||
|
||||
if n := s.kcp.PeekSize(); n > 0 { // data arrived
|
||||
if len(b) >= n {
|
||||
s.kcp.Recv(b)
|
||||
} else {
|
||||
buf := make([]byte, n)
|
||||
s.kcp.Recv(buf)
|
||||
n = copy(b, buf)
|
||||
s.sockbuff = buf[n:] // store remaining bytes into sockbuff for next read
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return n, nil
|
||||
}
|
||||
|
||||
var timeout <-chan time.Time
|
||||
if !s.rd.IsZero() {
|
||||
delay := s.rd.Sub(time.Now())
|
||||
timeout = time.After(delay)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// wait for read event or timeout
|
||||
select {
|
||||
case <-s.chReadEvent:
|
||||
case <-timeout:
|
||||
case <-s.die:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write implements the Conn Write method.
|
||||
func (s *UDPSession) Write(b []byte) (n int, err error) {
|
||||
for {
|
||||
s.mu.Lock()
|
||||
if s.isClosed {
|
||||
s.mu.Unlock()
|
||||
return 0, errBrokenPipe
|
||||
}
|
||||
|
||||
if !s.wd.IsZero() {
|
||||
if time.Now().After(s.wd) { // timeout
|
||||
s.mu.Unlock()
|
||||
return 0, errTimeout
|
||||
}
|
||||
}
|
||||
|
||||
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
|
||||
n = len(b)
|
||||
max := s.kcp.mss << 8
|
||||
for {
|
||||
if len(b) <= int(max) { // in most cases
|
||||
s.kcp.Send(b)
|
||||
break
|
||||
} else {
|
||||
s.kcp.Send(b[:max])
|
||||
b = b[max:]
|
||||
}
|
||||
}
|
||||
s.kcp.current = currentMs()
|
||||
s.kcp.flush()
|
||||
s.mu.Unlock()
|
||||
return n, nil
|
||||
}
|
||||
|
||||
var timeout <-chan time.Time
|
||||
if !s.wd.IsZero() {
|
||||
delay := s.wd.Sub(time.Now())
|
||||
timeout = time.After(delay)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// wait for write event or timeout
|
||||
select {
|
||||
case <-s.chWriteEvent:
|
||||
case <-timeout:
|
||||
case <-s.die:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the connection.
|
||||
func (s *UDPSession) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.isClosed {
|
||||
return errBrokenPipe
|
||||
}
|
||||
close(s.die)
|
||||
s.isClosed = true
|
||||
if s.l == nil { // client socket close
|
||||
s.conn.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
|
||||
func (s *UDPSession) LocalAddr() net.Addr {
|
||||
return s.local
|
||||
}
|
||||
|
||||
// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
|
||||
func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
|
||||
|
||||
// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
|
||||
func (s *UDPSession) SetDeadline(t time.Time) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.rd = t
|
||||
s.wd = t
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetReadDeadline implements the Conn SetReadDeadline method.
|
||||
func (s *UDPSession) SetReadDeadline(t time.Time) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.rd = t
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetWriteDeadline implements the Conn SetWriteDeadline method.
|
||||
func (s *UDPSession) SetWriteDeadline(t time.Time) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.wd = t
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetWindowSize set maximum window size
|
||||
func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.kcp.WndSize(sndwnd, rcvwnd)
|
||||
}
|
||||
|
||||
// SetMtu sets the maximum transmission unit
|
||||
func (s *UDPSession) SetMtu(mtu int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.kcp.SetMtu(mtu - s.headerSize)
|
||||
}
|
||||
|
||||
// SetACKNoDelay changes ack flush option, set true to flush ack immediately,
|
||||
func (s *UDPSession) SetACKNoDelay(nodelay bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.ackNoDelay = nodelay
|
||||
}
|
||||
|
||||
// SetNoDelay calls nodelay() of kcp
|
||||
func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.kcp.NoDelay(nodelay, interval, resend, nc)
|
||||
}
|
||||
|
||||
// SetDSCP sets the DSCP field of IP header
|
||||
func (s *UDPSession) SetDSCP(tos int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if err := ipv4.NewConn(s.conn).SetTOS(tos << 2); err != nil {
|
||||
log.Println("set tos:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *UDPSession) outputTask() {
|
||||
// ping
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case ext := <-s.chUDPOutput:
|
||||
if s.block != nil {
|
||||
io.ReadFull(crand.Reader, ext[:otpSize]) // OTP
|
||||
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
|
||||
binary.LittleEndian.PutUint32(ext[otpSize:], checksum)
|
||||
s.block.Encrypt(ext, ext)
|
||||
}
|
||||
|
||||
//if rand.Intn(100) < 80 {
|
||||
n, err := s.conn.WriteTo(ext, s.remote)
|
||||
if err != nil {
|
||||
log.Println(err, n)
|
||||
}
|
||||
//}
|
||||
|
||||
case <-ticker.C:
|
||||
sz := rand.Intn(IKCP_MTU_DEF - s.headerSize - IKCP_OVERHEAD)
|
||||
sz += s.headerSize + IKCP_OVERHEAD
|
||||
ping := make([]byte, sz)
|
||||
io.ReadFull(crand.Reader, ping)
|
||||
if s.block != nil {
|
||||
checksum := crc32.ChecksumIEEE(ping[cryptHeaderSize:])
|
||||
binary.LittleEndian.PutUint32(ping[otpSize:], checksum)
|
||||
s.block.Encrypt(ping, ping)
|
||||
}
|
||||
|
||||
n, err := s.conn.WriteTo(ping, s.remote)
|
||||
if err != nil {
|
||||
log.Println(err, n)
|
||||
}
|
||||
case <-s.die:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// kcp update, input loop
|
||||
func (s *UDPSession) updateTask() {
|
||||
var tc <-chan time.Time
|
||||
if s.l == nil { // client
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
tc = ticker.C
|
||||
defer ticker.Stop()
|
||||
} else {
|
||||
tc = s.chTicker
|
||||
}
|
||||
|
||||
var nextupdate uint32
|
||||
for {
|
||||
select {
|
||||
case <-tc:
|
||||
s.mu.Lock()
|
||||
current := currentMs()
|
||||
if current >= nextupdate || s.needUpdate {
|
||||
s.kcp.Update(current)
|
||||
nextupdate = s.kcp.Check(current)
|
||||
}
|
||||
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
|
||||
s.notifyWriteEvent()
|
||||
}
|
||||
s.needUpdate = false
|
||||
s.mu.Unlock()
|
||||
case <-s.die:
|
||||
if s.l != nil { // has listener
|
||||
s.l.chDeadlinks <- s.remote
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetConv gets conversation id of a session
|
||||
func (s *UDPSession) GetConv() uint32 {
|
||||
return s.kcp.conv
|
||||
}
|
||||
|
||||
func (s *UDPSession) notifyReadEvent() {
|
||||
select {
|
||||
case s.chReadEvent <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *UDPSession) notifyWriteEvent() {
|
||||
select {
|
||||
case s.chWriteEvent <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *UDPSession) kcpInput(data []byte) {
|
||||
now := time.Now()
|
||||
if now.Sub(s.lastInputTs) > connTimeout {
|
||||
s.Close()
|
||||
return
|
||||
}
|
||||
s.lastInputTs = now
|
||||
|
||||
s.mu.Lock()
|
||||
s.kcp.current = currentMs()
|
||||
s.kcp.Input(data)
|
||||
|
||||
if s.ackNoDelay {
|
||||
s.kcp.current = currentMs()
|
||||
s.kcp.flush()
|
||||
} else {
|
||||
s.needUpdate = true
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.notifyReadEvent()
|
||||
}
|
||||
|
||||
func (s *UDPSession) receiver(ch chan []byte) {
|
||||
for {
|
||||
data := make([]byte, mtuLimit)
|
||||
if n, _, err := s.conn.ReadFromUDP(data); err == nil && n >= s.headerSize+IKCP_OVERHEAD {
|
||||
ch <- data[:n]
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// read loop for client session
|
||||
func (s *UDPSession) readLoop() {
|
||||
chPacket := make(chan []byte, rxQueueLimit)
|
||||
go s.receiver(chPacket)
|
||||
|
||||
for {
|
||||
select {
|
||||
case data := <-chPacket:
|
||||
dataValid := false
|
||||
if s.block != nil {
|
||||
s.block.Decrypt(data, data)
|
||||
data = data[otpSize:]
|
||||
checksum := crc32.ChecksumIEEE(data[crcSize:])
|
||||
if checksum == binary.LittleEndian.Uint32(data) {
|
||||
data = data[crcSize:]
|
||||
dataValid = true
|
||||
}
|
||||
} else if s.block == nil {
|
||||
dataValid = true
|
||||
}
|
||||
|
||||
if dataValid {
|
||||
s.kcpInput(data)
|
||||
}
|
||||
case <-s.die:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
// Listener defines a server listening for connections
|
||||
Listener struct {
|
||||
block BlockCrypt
|
||||
conn *net.UDPConn
|
||||
sessions map[string]*UDPSession
|
||||
chAccepts chan *UDPSession
|
||||
chDeadlinks chan net.Addr
|
||||
headerSize int
|
||||
die chan struct{}
|
||||
}
|
||||
|
||||
packet struct {
|
||||
from *net.UDPAddr
|
||||
data []byte
|
||||
}
|
||||
)
|
||||
|
||||
// monitor incoming data for all connections of server
|
||||
func (l *Listener) monitor() {
|
||||
chPacket := make(chan packet, rxQueueLimit)
|
||||
go l.receiver(chPacket)
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case p := <-chPacket:
|
||||
data := p.data
|
||||
from := p.from
|
||||
dataValid := false
|
||||
if l.block != nil {
|
||||
l.block.Decrypt(data, data)
|
||||
data = data[otpSize:]
|
||||
checksum := crc32.ChecksumIEEE(data[crcSize:])
|
||||
if checksum == binary.LittleEndian.Uint32(data) {
|
||||
data = data[crcSize:]
|
||||
dataValid = true
|
||||
}
|
||||
} else if l.block == nil {
|
||||
dataValid = true
|
||||
}
|
||||
|
||||
if dataValid {
|
||||
addr := from.String()
|
||||
s, ok := l.sessions[addr]
|
||||
if !ok { // new session
|
||||
var conv uint32
|
||||
convValid := false
|
||||
|
||||
conv = binary.LittleEndian.Uint32(data)
|
||||
convValid = true
|
||||
|
||||
if convValid {
|
||||
s := newUDPSession(conv, l, l.conn, from, l.block)
|
||||
s.kcpInput(data)
|
||||
l.sessions[addr] = s
|
||||
l.chAccepts <- s
|
||||
}
|
||||
} else {
|
||||
s.kcpInput(data)
|
||||
}
|
||||
}
|
||||
case deadlink := <-l.chDeadlinks:
|
||||
delete(l.sessions, deadlink.String())
|
||||
case <-l.die:
|
||||
return
|
||||
case <-ticker.C:
|
||||
now := time.Now()
|
||||
for _, s := range l.sessions {
|
||||
select {
|
||||
case s.chTicker <- now:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Listener) receiver(ch chan packet) {
|
||||
for {
|
||||
data := make([]byte, mtuLimit)
|
||||
if n, from, err := l.conn.ReadFromUDP(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD {
|
||||
ch <- packet{from, data[:n]}
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
|
||||
func (l *Listener) Accept() (*UDPSession, error) {
|
||||
select {
|
||||
case c := <-l.chAccepts:
|
||||
return c, nil
|
||||
case <-l.die:
|
||||
return nil, errors.New("listener stopped")
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops listening on the UDP address. Already Accepted connections are not closed.
|
||||
func (l *Listener) Close() error {
|
||||
if err := l.conn.Close(); err == nil {
|
||||
close(l.die)
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
|
||||
func (l *Listener) Addr() net.Addr {
|
||||
return l.conn.LocalAddr()
|
||||
}
|
||||
|
||||
// Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
|
||||
func Listen(laddr string) (*Listener, error) {
|
||||
return ListenWithOptions(laddr, nil)
|
||||
}
|
||||
|
||||
// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
|
||||
// FEC = 0 means no FEC, FEC > 0 means num(FEC) as a FEC cluster
|
||||
func ListenWithOptions(laddr string, block BlockCrypt) (*Listener, error) {
|
||||
udpaddr, err := net.ResolveUDPAddr("udp", laddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := net.ListenUDP("udp", udpaddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l := new(Listener)
|
||||
l.conn = conn
|
||||
l.sessions = make(map[string]*UDPSession)
|
||||
l.chAccepts = make(chan *UDPSession, 1024)
|
||||
l.chDeadlinks = make(chan net.Addr, 1024)
|
||||
l.die = make(chan struct{})
|
||||
l.block = block
|
||||
|
||||
// caculate header size
|
||||
if l.block != nil {
|
||||
l.headerSize += cryptHeaderSize
|
||||
}
|
||||
|
||||
go l.monitor()
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// Dial connects to the remote address raddr on the network "udp"
|
||||
func Dial(raddr string) (*UDPSession, error) {
|
||||
return DialWithOptions(raddr, nil)
|
||||
}
|
||||
|
||||
// DialWithOptions connects to the remote address raddr on the network "udp" with packet encryption
|
||||
func DialWithOptions(raddr string, block BlockCrypt) (*UDPSession, error) {
|
||||
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for {
|
||||
port := basePort + rand.Int()%(maxPort-basePort)
|
||||
if udpconn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port}); err == nil {
|
||||
return newUDPSession(rand.Uint32(), nil, udpconn, udpaddr, block), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func currentMs() uint32 {
|
||||
return uint32(time.Now().UnixNano() / int64(time.Millisecond))
|
||||
}
|
@ -7,12 +7,10 @@ import (
|
||||
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/transport/internet"
|
||||
|
||||
"github.com/xtaci/kcp-go"
|
||||
)
|
||||
|
||||
type KCPVlistener struct {
|
||||
lst *kcp.Listener
|
||||
lst *Listener
|
||||
previousSocketid map[int]uint32
|
||||
previousSocketid_mapid int
|
||||
}
|
||||
@ -73,7 +71,7 @@ func (kvl *KCPVlistener) Addr() net.Addr {
|
||||
}
|
||||
|
||||
type KCPVconn struct {
|
||||
hc *kcp.UDPSession
|
||||
hc *UDPSession
|
||||
conntokeep time.Time
|
||||
}
|
||||
|
||||
@ -168,8 +166,8 @@ func (this *KCPVconn) SetReusable(b bool) {
|
||||
|
||||
func ListenKCP(address v2net.Address, port v2net.Port) (internet.Listener, error) {
|
||||
laddr := address.String() + ":" + port.String()
|
||||
crypt, _ := kcp.NewNoneBlockCrypt(nil)
|
||||
kcl, err := kcp.ListenWithOptions(effectiveConfig.Fec, laddr, crypt)
|
||||
crypt, _ := NewNoneBlockCrypt(nil)
|
||||
kcl, err := ListenWithOptions(laddr, crypt)
|
||||
kcvl := &KCPVlistener{lst: kcl}
|
||||
return kcvl, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user