1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-09-07 04:24:23 -04:00
v2fly/transport/internet/kcp/kcp.go

233 lines
5.5 KiB
Go
Raw Normal View History

2016-06-14 17:25:06 -04:00
// Package kcp - A Fast and Reliable ARQ Protocol
//
// Acknowledgement:
// skywind3000@github for inventing the KCP protocol
// xtaci@github for translating to Golang
package kcp
import (
2016-06-29 04:34:34 -04:00
"github.com/v2ray/v2ray-core/common/log"
2016-06-14 17:25:06 -04:00
)
func _itimediff(later, earlier uint32) int32 {
return (int32)(later - earlier)
}
2016-06-29 04:34:34 -04:00
type State int
2016-06-18 13:08:02 -04:00
2016-06-29 04:34:34 -04:00
const (
StateActive State = 0
StateReadyToClose State = 1
StatePeerClosed State = 2
StateTerminating State = 3
StateTerminated State = 4
)
2016-06-14 17:25:06 -04:00
// KCP defines a single KCP connection
type KCP struct {
2016-06-29 06:52:23 -04:00
conv uint16
state State
stateBeginTime uint32
lastIncomingTime uint32
2016-06-29 08:49:49 -04:00
lastPayloadTime uint32
2016-06-29 06:52:23 -04:00
sendingUpdated bool
lastPingTime uint32
2016-07-02 15:26:50 -04:00
mss uint32
rx_rttvar, rx_srtt, rx_rto uint32
current, interval uint32
2016-06-14 17:25:06 -04:00
2016-07-02 15:26:50 -04:00
receivingWorker *ReceivingWorker
2016-07-03 16:14:38 -04:00
sendingWorker *SendingWorker
2016-06-14 17:25:06 -04:00
2016-07-02 17:59:43 -04:00
fastresend uint32
2016-06-20 10:10:47 -04:00
congestionControl bool
2016-07-02 15:26:50 -04:00
output *BufferedSegmentWriter
2016-06-14 17:25:06 -04:00
}
// NewKCP create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection.
2016-07-02 05:19:32 -04:00
func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
2016-06-29 06:52:23 -04:00
log.Debug("KCP|Core: creating KCP ", conv)
2016-06-14 17:25:06 -04:00
kcp := new(KCP)
kcp.conv = conv
2016-07-02 02:45:31 -04:00
kcp.mss = output.Mtu() - DataSegmentOverhead
2016-07-02 05:31:15 -04:00
kcp.rx_rto = 100
kcp.interval = effectiveConfig.Tti
2016-07-02 02:45:31 -04:00
kcp.output = NewSegmentWriter(output)
2016-07-02 15:26:50 -04:00
kcp.receivingWorker = NewReceivingWorker(kcp)
2016-07-02 17:59:43 -04:00
kcp.fastresend = 2
kcp.congestionControl = effectiveConfig.Congestion
2016-07-03 16:14:38 -04:00
kcp.sendingWorker = NewSendingWorker(kcp)
2016-06-14 17:25:06 -04:00
return kcp
}
2016-06-30 08:51:49 -04:00
func (kcp *KCP) SetState(state State) {
kcp.state = state
kcp.stateBeginTime = kcp.current
switch state {
case StateReadyToClose:
2016-07-02 15:26:50 -04:00
kcp.receivingWorker.CloseRead()
2016-06-30 08:51:49 -04:00
case StatePeerClosed:
2016-07-03 16:14:38 -04:00
kcp.sendingWorker.CloseWrite()
2016-06-30 08:51:49 -04:00
case StateTerminating:
2016-07-02 15:26:50 -04:00
kcp.receivingWorker.CloseRead()
2016-07-03 16:14:38 -04:00
kcp.sendingWorker.CloseWrite()
2016-06-30 08:51:49 -04:00
case StateTerminated:
2016-07-02 15:26:50 -04:00
kcp.receivingWorker.CloseRead()
2016-07-03 16:14:38 -04:00
kcp.sendingWorker.CloseWrite()
2016-06-30 08:51:49 -04:00
}
}
2016-06-29 04:34:34 -04:00
func (kcp *KCP) HandleOption(opt SegmentOption) {
if (opt & SegmentOptionClose) == SegmentOptionClose {
kcp.OnPeerClosed()
}
}
func (kcp *KCP) OnPeerClosed() {
if kcp.state == StateReadyToClose {
2016-06-30 08:51:49 -04:00
kcp.SetState(StateTerminating)
2016-06-29 04:34:34 -04:00
}
if kcp.state == StateActive {
2016-06-30 08:51:49 -04:00
kcp.SetState(StatePeerClosed)
2016-06-29 04:34:34 -04:00
}
}
func (kcp *KCP) OnClose() {
if kcp.state == StateActive {
2016-06-30 08:51:49 -04:00
kcp.SetState(StateReadyToClose)
2016-06-29 04:34:34 -04:00
}
if kcp.state == StatePeerClosed {
2016-06-30 08:51:49 -04:00
kcp.SetState(StateTerminating)
2016-06-29 04:34:34 -04:00
}
}
2016-06-14 17:25:06 -04:00
// https://tools.ietf.org/html/rfc6298
func (kcp *KCP) update_ack(rtt int32) {
if kcp.rx_srtt == 0 {
kcp.rx_srtt = uint32(rtt)
2016-06-25 16:10:17 -04:00
kcp.rx_rttvar = uint32(rtt) / 2
2016-06-14 17:25:06 -04:00
} else {
delta := rtt - int32(kcp.rx_srtt)
if delta < 0 {
delta = -delta
}
2016-06-25 16:10:17 -04:00
kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
2016-06-14 17:25:06 -04:00
kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
2016-06-27 03:14:30 -04:00
if kcp.rx_srtt < kcp.interval {
kcp.rx_srtt = kcp.interval
2016-06-14 17:25:06 -04:00
}
}
2016-06-30 05:35:35 -04:00
var rto uint32
if kcp.interval < 4*kcp.rx_rttvar {
rto = kcp.rx_srtt + 4*kcp.rx_rttvar
} else {
rto = kcp.rx_srtt + kcp.interval
}
2016-07-02 05:31:15 -04:00
if rto > 10000 {
rto = 10000
2016-06-14 19:06:02 -04:00
}
2016-06-27 11:05:01 -04:00
kcp.rx_rto = rto * 3 / 2
2016-06-14 17:25:06 -04:00
}
// Input when you received a low level packet (eg. UDP packet), call it
func (kcp *KCP) Input(data []byte) int {
2016-06-29 06:52:23 -04:00
kcp.lastIncomingTime = kcp.current
2016-06-29 04:34:34 -04:00
var seg ISegment
2016-06-14 17:25:06 -04:00
for {
2016-06-29 04:34:34 -04:00
seg, data = ReadSegment(data)
if seg == nil {
2016-06-14 17:25:06 -04:00
break
}
2016-06-29 04:34:34 -04:00
switch seg := seg.(type) {
case *DataSegment:
kcp.HandleOption(seg.Opt)
2016-07-02 15:26:50 -04:00
kcp.receivingWorker.ProcessSegment(seg)
2016-06-29 08:49:49 -04:00
kcp.lastPayloadTime = kcp.current
2016-07-02 16:17:41 -04:00
case *AckSegment:
2016-06-29 04:34:34 -04:00
kcp.HandleOption(seg.Opt)
2016-07-03 16:14:38 -04:00
kcp.sendingWorker.ProcessAckSegment(seg)
2016-06-29 08:49:49 -04:00
kcp.lastPayloadTime = kcp.current
2016-06-29 04:34:34 -04:00
case *CmdOnlySegment:
kcp.HandleOption(seg.Opt)
if seg.Cmd == SegmentCommandTerminated {
if kcp.state == StateActive ||
kcp.state == StateReadyToClose ||
kcp.state == StatePeerClosed {
2016-06-30 08:51:49 -04:00
kcp.SetState(StateTerminating)
2016-06-29 04:34:34 -04:00
} else if kcp.state == StateTerminating {
2016-06-30 08:51:49 -04:00
kcp.SetState(StateTerminated)
2016-06-29 04:34:34 -04:00
}
}
2016-07-03 16:14:38 -04:00
kcp.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
2016-07-02 15:26:50 -04:00
kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
2016-06-29 04:34:34 -04:00
default:
2016-06-14 17:25:06 -04:00
}
}
return 0
}
// flush pending data
func (kcp *KCP) flush() {
2016-06-29 04:34:34 -04:00
if kcp.state == StateTerminated {
return
}
2016-06-29 08:49:49 -04:00
if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
kcp.OnClose()
}
2016-06-29 04:34:34 -04:00
if kcp.state == StateTerminating {
kcp.output.Write(&CmdOnlySegment{
Conv: kcp.conv,
Cmd: SegmentCommandTerminated,
})
kcp.output.Flush()
if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
2016-06-30 08:51:49 -04:00
kcp.SetState(StateTerminated)
2016-06-29 04:34:34 -04:00
}
return
}
if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
2016-06-30 08:51:49 -04:00
kcp.SetState(StateTerminating)
2016-06-29 04:34:34 -04:00
}
2016-06-14 17:25:06 -04:00
// flush acknowledges
2016-07-02 15:26:50 -04:00
kcp.receivingWorker.Flush()
2016-07-03 16:14:38 -04:00
kcp.sendingWorker.Flush()
2016-06-14 17:25:06 -04:00
2016-07-03 16:14:38 -04:00
if kcp.sendingWorker.PingNecessary() || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
2016-06-29 06:52:23 -04:00
seg := &CmdOnlySegment{
2016-06-29 04:34:34 -04:00
Conv: kcp.conv,
Cmd: SegmentCommandPing,
2016-07-02 15:26:50 -04:00
ReceivinNext: kcp.receivingWorker.nextNumber,
2016-07-03 16:14:38 -04:00
SendingNext: kcp.sendingWorker.firstUnacknowledged,
2016-06-29 06:52:23 -04:00
}
if kcp.state == StateReadyToClose {
seg.Opt = SegmentOptionClose
}
kcp.output.Write(seg)
kcp.lastPingTime = kcp.current
kcp.sendingUpdated = false
2016-06-29 04:34:34 -04:00
}
2016-06-29 06:52:23 -04:00
// flash remain segments
kcp.output.Flush()
2016-06-14 17:25:06 -04:00
2016-07-01 05:57:13 -04:00
}
2016-06-14 17:25:06 -04:00
// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
func (kcp *KCP) Update(current uint32) {
kcp.current = current
2016-06-30 08:51:49 -04:00
kcp.flush()
2016-06-14 17:25:06 -04:00
}