1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-07-01 19:45:24 +00:00
v2fly/transport/internet/kcp/kcp.go

433 lines
9.7 KiB
Go
Raw Normal View History

2016-06-14 21:25:06 +00:00
// Package kcp - A Fast and Reliable ARQ Protocol
//
// Acknowledgement:
// skywind3000@github for inventing the KCP protocol
// xtaci@github for translating to Golang
package kcp
import (
2016-06-18 16:53:29 +00:00
"github.com/v2ray/v2ray-core/common/alloc"
2016-06-29 08:34:34 +00:00
"github.com/v2ray/v2ray-core/common/log"
2016-06-14 21:25:06 +00:00
)
func _itimediff(later, earlier uint32) int32 {
return (int32)(later - earlier)
}
2016-06-29 08:34:34 +00:00
type State int
2016-06-18 17:08:02 +00:00
2016-06-29 08:34:34 +00:00
const (
StateActive State = 0
StateReadyToClose State = 1
StatePeerClosed State = 2
StateTerminating State = 3
StateTerminated State = 4
)
2016-06-14 21:25:06 +00:00
// KCP defines a single KCP connection
type KCP struct {
2016-06-29 10:52:23 +00:00
conv uint16
state State
stateBeginTime uint32
lastIncomingTime uint32
2016-06-29 12:49:49 +00:00
lastPayloadTime uint32
2016-06-29 10:52:23 +00:00
sendingUpdated bool
receivingUpdated bool
lastPingTime uint32
2016-07-02 06:45:31 +00:00
mss uint32
2016-06-30 12:51:49 +00:00
snd_una, snd_nxt, rcv_nxt uint32
rx_rttvar, rx_srtt, rx_rto uint32
snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32
current, interval uint32
2016-06-14 21:25:06 +00:00
2016-06-26 21:51:17 +00:00
snd_queue *SendingQueue
2016-06-30 12:51:49 +00:00
rcv_queue *ReceivingQueue
2016-07-01 09:57:13 +00:00
snd_buf *SendingWindow
2016-06-25 19:35:18 +00:00
rcv_buf *ReceivingWindow
2016-06-14 21:25:06 +00:00
2016-07-02 09:33:34 +00:00
acklist *AckList
2016-06-14 21:25:06 +00:00
2016-06-20 14:10:47 +00:00
fastresend int32
congestionControl bool
2016-06-29 08:34:34 +00:00
output *SegmentWriter
2016-06-14 21:25:06 +00:00
}
// NewKCP create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection.
2016-07-02 09:19:32 +00:00
func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
2016-06-29 10:52:23 +00:00
log.Debug("KCP|Core: creating KCP ", conv)
2016-06-14 21:25:06 +00:00
kcp := new(KCP)
kcp.conv = conv
2016-07-02 09:19:32 +00:00
kcp.snd_wnd = effectiveConfig.GetSendingWindowSize()
kcp.rcv_wnd = effectiveConfig.GetReceivingWindowSize()
2016-07-02 09:31:15 +00:00
kcp.rmt_wnd = 32
2016-07-02 06:45:31 +00:00
kcp.mss = output.Mtu() - DataSegmentOverhead
2016-07-02 09:31:15 +00:00
kcp.rx_rto = 100
kcp.interval = effectiveConfig.Tti
2016-07-02 06:45:31 +00:00
kcp.output = NewSegmentWriter(output)
2016-07-02 09:19:32 +00:00
kcp.rcv_buf = NewReceivingWindow(effectiveConfig.GetReceivingWindowSize())
kcp.snd_queue = NewSendingQueue(effectiveConfig.GetSendingQueueSize())
2016-06-30 12:51:49 +00:00
kcp.rcv_queue = NewReceivingQueue()
2016-06-30 20:19:30 +00:00
kcp.acklist = NewACKList(kcp)
2016-07-02 09:19:32 +00:00
kcp.snd_buf = NewSendingWindow(kcp, effectiveConfig.GetSendingWindowSize())
2016-06-29 20:30:38 +00:00
kcp.cwnd = kcp.snd_wnd
2016-06-14 21:25:06 +00:00
return kcp
}
2016-06-30 12:51:49 +00:00
func (kcp *KCP) SetState(state State) {
kcp.state = state
kcp.stateBeginTime = kcp.current
switch state {
case StateReadyToClose:
kcp.rcv_queue.Close()
case StatePeerClosed:
kcp.ClearSendQueue()
case StateTerminating:
kcp.rcv_queue.Close()
case StateTerminated:
kcp.rcv_queue.Close()
}
}
2016-06-29 08:34:34 +00:00
func (kcp *KCP) HandleOption(opt SegmentOption) {
if (opt & SegmentOptionClose) == SegmentOptionClose {
kcp.OnPeerClosed()
}
}
func (kcp *KCP) OnPeerClosed() {
if kcp.state == StateReadyToClose {
2016-06-30 12:51:49 +00:00
kcp.SetState(StateTerminating)
2016-06-29 08:34:34 +00:00
}
if kcp.state == StateActive {
2016-06-30 12:51:49 +00:00
kcp.SetState(StatePeerClosed)
2016-06-29 08:34:34 +00:00
}
}
func (kcp *KCP) OnClose() {
if kcp.state == StateActive {
2016-06-30 12:51:49 +00:00
kcp.SetState(StateReadyToClose)
2016-06-29 08:34:34 +00:00
}
if kcp.state == StatePeerClosed {
2016-06-30 12:51:49 +00:00
kcp.SetState(StateTerminating)
2016-06-29 08:34:34 +00:00
}
}
2016-06-25 19:35:18 +00:00
// DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
// @Private
func (kcp *KCP) DumpReceivingBuf() {
for {
seg := kcp.rcv_buf.RemoveFirst()
if seg == nil {
break
}
2016-06-30 12:51:49 +00:00
kcp.rcv_queue.Put(seg.Data)
seg.Data = nil
2016-06-25 19:35:18 +00:00
kcp.rcv_buf.Advance()
kcp.rcv_nxt++
2016-06-29 21:41:04 +00:00
kcp.receivingUpdated = true
2016-06-25 19:35:18 +00:00
}
}
2016-06-14 21:25:06 +00:00
// Send is user/upper level send, returns below zero for error
func (kcp *KCP) Send(buffer []byte) int {
2016-06-26 21:51:17 +00:00
nBytes := 0
for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
2016-06-14 21:25:06 +00:00
var size int
if len(buffer) > int(kcp.mss) {
size = int(kcp.mss)
} else {
size = len(buffer)
}
2016-06-29 08:34:34 +00:00
seg := &DataSegment{
Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
}
2016-06-26 21:51:17 +00:00
kcp.snd_queue.Push(seg)
2016-06-14 21:25:06 +00:00
buffer = buffer[size:]
2016-06-26 21:51:17 +00:00
nBytes += size
2016-06-14 21:25:06 +00:00
}
2016-06-26 21:51:17 +00:00
return nBytes
2016-06-14 21:25:06 +00:00
}
// https://tools.ietf.org/html/rfc6298
func (kcp *KCP) update_ack(rtt int32) {
if kcp.rx_srtt == 0 {
kcp.rx_srtt = uint32(rtt)
2016-06-25 20:10:17 +00:00
kcp.rx_rttvar = uint32(rtt) / 2
2016-06-14 21:25:06 +00:00
} else {
delta := rtt - int32(kcp.rx_srtt)
if delta < 0 {
delta = -delta
}
2016-06-25 20:10:17 +00:00
kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
2016-06-14 21:25:06 +00:00
kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
2016-06-27 07:14:30 +00:00
if kcp.rx_srtt < kcp.interval {
kcp.rx_srtt = kcp.interval
2016-06-14 21:25:06 +00:00
}
}
2016-06-30 09:35:35 +00:00
var rto uint32
if kcp.interval < 4*kcp.rx_rttvar {
rto = kcp.rx_srtt + 4*kcp.rx_rttvar
} else {
rto = kcp.rx_srtt + kcp.interval
}
2016-07-02 09:31:15 +00:00
if rto > 10000 {
rto = 10000
2016-06-14 23:06:02 +00:00
}
2016-06-27 15:05:01 +00:00
kcp.rx_rto = rto * 3 / 2
2016-06-14 21:25:06 +00:00
}
func (kcp *KCP) shrink_buf() {
2016-06-29 10:52:23 +00:00
prevUna := kcp.snd_una
2016-07-01 09:57:13 +00:00
if kcp.snd_buf.Len() > 0 {
seg := kcp.snd_buf.First()
2016-06-29 08:34:34 +00:00
kcp.snd_una = seg.Number
2016-06-14 21:25:06 +00:00
} else {
kcp.snd_una = kcp.snd_nxt
}
2016-06-29 10:52:23 +00:00
if kcp.snd_una != prevUna {
kcp.sendingUpdated = true
}
2016-06-14 21:25:06 +00:00
}
func (kcp *KCP) parse_ack(sn uint32) {
if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
return
}
2016-07-01 09:57:13 +00:00
kcp.snd_buf.Remove(sn - kcp.snd_una)
2016-06-14 21:25:06 +00:00
}
func (kcp *KCP) parse_fastack(sn uint32) {
if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
return
}
2016-07-01 09:57:13 +00:00
kcp.snd_buf.HandleFastAck(sn)
2016-06-14 21:25:06 +00:00
}
2016-06-29 08:34:34 +00:00
func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
2016-07-01 09:57:13 +00:00
kcp.snd_buf.Clear(receivingNext)
2016-06-14 21:25:06 +00:00
}
2016-06-29 08:34:34 +00:00
func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
2016-06-30 20:19:30 +00:00
kcp.acklist.Clear(sendingNext)
2016-06-14 21:25:06 +00:00
}
2016-06-29 08:34:34 +00:00
func (kcp *KCP) parse_data(newseg *DataSegment) {
sn := newseg.Number
2016-06-14 21:25:06 +00:00
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
_itimediff(sn, kcp.rcv_nxt) < 0 {
return
}
2016-06-25 19:35:18 +00:00
idx := sn - kcp.rcv_nxt
if !kcp.rcv_buf.Set(idx, newseg) {
newseg.Release()
2016-06-14 21:25:06 +00:00
}
2016-06-25 19:35:18 +00:00
kcp.DumpReceivingBuf()
2016-06-14 21:25:06 +00:00
}
// Input when you received a low level packet (eg. UDP packet), call it
func (kcp *KCP) Input(data []byte) int {
2016-06-29 10:52:23 +00:00
kcp.lastIncomingTime = kcp.current
2016-06-29 08:34:34 +00:00
var seg ISegment
2016-06-14 21:25:06 +00:00
var maxack uint32
var flag int
for {
2016-06-29 08:34:34 +00:00
seg, data = ReadSegment(data)
if seg == nil {
2016-06-14 21:25:06 +00:00
break
}
2016-06-29 08:34:34 +00:00
switch seg := seg.(type) {
case *DataSegment:
kcp.HandleOption(seg.Opt)
kcp.HandleSendingNext(seg.SendingNext)
kcp.acklist.Add(seg.Number, seg.Timestamp)
2016-06-29 21:41:04 +00:00
kcp.receivingUpdated = true
2016-06-29 08:34:34 +00:00
kcp.parse_data(seg)
2016-06-29 12:49:49 +00:00
kcp.lastPayloadTime = kcp.current
2016-06-29 08:34:34 +00:00
case *ACKSegment:
kcp.HandleOption(seg.Opt)
if kcp.rmt_wnd < seg.ReceivingWindow {
kcp.rmt_wnd = seg.ReceivingWindow
2016-06-14 21:25:06 +00:00
}
2016-06-29 08:34:34 +00:00
kcp.HandleReceivingNext(seg.ReceivingNext)
for i := 0; i < int(seg.Count); i++ {
ts := seg.TimestampList[i]
sn := seg.NumberList[i]
if _itimediff(kcp.current, ts) >= 0 {
kcp.update_ack(_itimediff(kcp.current, ts))
}
kcp.parse_ack(sn)
2016-07-01 21:27:57 +00:00
kcp.shrink_buf()
2016-06-29 08:34:34 +00:00
if flag == 0 {
flag = 1
maxack = sn
} else if _itimediff(sn, maxack) > 0 {
maxack = sn
2016-06-14 21:25:06 +00:00
}
}
2016-06-29 12:49:49 +00:00
kcp.lastPayloadTime = kcp.current
2016-06-29 08:34:34 +00:00
case *CmdOnlySegment:
kcp.HandleOption(seg.Opt)
if seg.Cmd == SegmentCommandTerminated {
if kcp.state == StateActive ||
kcp.state == StateReadyToClose ||
kcp.state == StatePeerClosed {
2016-06-30 12:51:49 +00:00
kcp.SetState(StateTerminating)
2016-06-29 08:34:34 +00:00
} else if kcp.state == StateTerminating {
2016-06-30 12:51:49 +00:00
kcp.SetState(StateTerminated)
2016-06-29 08:34:34 +00:00
}
}
kcp.HandleReceivingNext(seg.ReceivinNext)
kcp.HandleSendingNext(seg.SendingNext)
2016-07-01 21:27:57 +00:00
kcp.shrink_buf()
2016-06-29 08:34:34 +00:00
default:
2016-06-14 21:25:06 +00:00
}
}
if flag != 0 {
kcp.parse_fastack(maxack)
}
return 0
}
// flush pending data
func (kcp *KCP) flush() {
2016-06-29 08:34:34 +00:00
if kcp.state == StateTerminated {
return
}
2016-06-29 12:49:49 +00:00
if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
kcp.OnClose()
}
2016-06-29 08:34:34 +00:00
if kcp.state == StateTerminating {
kcp.output.Write(&CmdOnlySegment{
Conv: kcp.conv,
Cmd: SegmentCommandTerminated,
})
kcp.output.Flush()
if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
2016-06-30 12:51:49 +00:00
kcp.SetState(StateTerminated)
2016-06-29 08:34:34 +00:00
}
return
}
if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
2016-06-30 12:51:49 +00:00
kcp.SetState(StateTerminating)
2016-06-29 08:34:34 +00:00
}
2016-06-14 21:25:06 +00:00
current := kcp.current
// flush acknowledges
2016-06-30 20:19:30 +00:00
if kcp.acklist.Flush() {
2016-06-29 10:52:23 +00:00
kcp.receivingUpdated = false
2016-06-14 21:25:06 +00:00
}
// calculate window size
2016-06-30 09:35:35 +00:00
cwnd := kcp.snd_una + kcp.snd_wnd
if cwnd < kcp.rmt_wnd {
cwnd = kcp.rmt_wnd
}
2016-06-30 09:19:35 +00:00
if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
cwnd = kcp.snd_una + kcp.cwnd
2016-06-14 21:25:06 +00:00
}
2016-06-26 21:51:17 +00:00
for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
2016-06-29 08:34:34 +00:00
seg := kcp.snd_queue.Pop()
seg.Conv = kcp.conv
seg.Number = kcp.snd_nxt
seg.timeout = current
seg.ackSkipped = 0
seg.transmit = 0
2016-07-01 09:57:13 +00:00
kcp.snd_buf.Push(seg)
2016-06-14 21:25:06 +00:00
kcp.snd_nxt++
}
// flush data segments
2016-07-01 09:57:13 +00:00
if kcp.snd_buf.Flush() {
kcp.sendingUpdated = false
2016-06-14 21:25:06 +00:00
}
2016-06-29 10:52:23 +00:00
if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
seg := &CmdOnlySegment{
2016-06-29 08:34:34 +00:00
Conv: kcp.conv,
Cmd: SegmentCommandPing,
ReceivinNext: kcp.rcv_nxt,
SendingNext: kcp.snd_una,
2016-06-29 10:52:23 +00:00
}
if kcp.state == StateReadyToClose {
seg.Opt = SegmentOptionClose
}
kcp.output.Write(seg)
kcp.lastPingTime = kcp.current
kcp.sendingUpdated = false
kcp.receivingUpdated = false
2016-06-29 08:34:34 +00:00
}
2016-06-29 10:52:23 +00:00
// flash remain segments
kcp.output.Flush()
2016-06-14 21:25:06 +00:00
2016-07-01 09:57:13 +00:00
}
func (kcp *KCP) HandleLost(lost bool) {
if !kcp.congestionControl {
return
}
if lost {
kcp.cwnd = 3 * kcp.cwnd / 4
} else {
kcp.cwnd += kcp.cwnd / 4
}
if kcp.cwnd < 4 {
kcp.cwnd = 4
}
if kcp.cwnd > kcp.snd_wnd {
kcp.cwnd = kcp.snd_wnd
2016-06-29 20:30:38 +00:00
}
2016-06-14 21:25:06 +00:00
}
// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
func (kcp *KCP) Update(current uint32) {
kcp.current = current
2016-06-30 12:51:49 +00:00
kcp.flush()
2016-06-14 21:25:06 +00:00
}
// NoDelay options
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
2016-06-25 19:35:18 +00:00
func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
kcp.interval = interval
2016-06-14 21:25:06 +00:00
if resend >= 0 {
kcp.fastresend = int32(resend)
}
2016-06-20 14:10:47 +00:00
kcp.congestionControl = congestionControl
2016-06-14 21:25:06 +00:00
return 0
}
// WaitSnd gets how many packet is waiting to be sent
2016-06-25 19:35:18 +00:00
func (kcp *KCP) WaitSnd() uint32 {
2016-07-01 09:57:13 +00:00
return uint32(kcp.snd_buf.Len()) + kcp.snd_queue.Len()
2016-06-14 21:25:06 +00:00
}
2016-06-23 21:17:17 +00:00
func (this *KCP) ClearSendQueue() {
2016-06-26 21:51:17 +00:00
this.snd_queue.Clear()
2016-07-01 09:57:13 +00:00
this.snd_buf.Clear(0xFFFFFFFF)
2016-06-23 21:17:17 +00:00
}