From 0047910a81758f0fa4a320cfd7cb9a89ec1104ee Mon Sep 17 00:00:00 2001 From: v2ray Date: Wed, 29 Jun 2016 12:52:23 +0200 Subject: [PATCH] bug fixes --- transport/internet/kcp/kcp.go | 86 +++++++++++++------------------ transport/internet/kcp/segment.go | 4 ++ 2 files changed, 39 insertions(+), 51 deletions(-) diff --git a/transport/internet/kcp/kcp.go b/transport/internet/kcp/kcp.go index ef5ebf126..69e8a058a 100644 --- a/transport/internet/kcp/kcp.go +++ b/transport/internet/kcp/kcp.go @@ -63,9 +63,14 @@ const ( // KCP defines a single KCP connection type KCP struct { - conv uint16 - state State - stateBeginTime uint32 + conv uint16 + state State + stateBeginTime uint32 + lastIncomingTime uint32 + sendingUpdated bool + receivingUpdated bool + lastPingTime uint32 + mtu, mss uint32 snd_una, snd_nxt, rcv_nxt uint32 ts_recent, ts_lastack, ssthresh uint32 @@ -83,7 +88,6 @@ type KCP struct { acklist *ACKList - buffer []byte fastresend int32 congestionControl bool output *SegmentWriter @@ -92,13 +96,14 @@ type KCP struct { // NewKCP create a new kcp control object, 'conv' must equal in two endpoint // from the same connection. func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output v2io.Writer) *KCP { + log.Debug("KCP|Core: creating KCP ", conv) kcp := new(KCP) kcp.conv = conv kcp.snd_wnd = sendingWindowSize kcp.rcv_wnd = receivingWindowSize kcp.rmt_wnd = IKCP_WND_RCV kcp.mtu = mtu - kcp.mss = kcp.mtu - IKCP_OVERHEAD + kcp.mss = kcp.mtu - DataSegmentOverhead kcp.rx_rto = IKCP_RTO_DEF kcp.interval = IKCP_INTERVAL kcp.ts_flush = IKCP_INTERVAL @@ -121,13 +126,11 @@ func (kcp *KCP) OnPeerClosed() { if kcp.state == StateReadyToClose { kcp.state = StateTerminating kcp.stateBeginTime = kcp.current - log.Info("KCP terminating at ", kcp.current) } if kcp.state == StateActive { kcp.ClearSendQueue() kcp.state = StatePeerClosed kcp.stateBeginTime = kcp.current - log.Info("KCP peer close at ", kcp.current) } } @@ -135,12 +138,10 @@ func (kcp *KCP) OnClose() { if kcp.state == StateActive { kcp.state = StateReadyToClose kcp.stateBeginTime = kcp.current - log.Info("KCP ready close at ", kcp.current) } if kcp.state == StatePeerClosed { kcp.state = StateTerminating kcp.stateBeginTime = kcp.current - log.Info("KCP terminating at ", kcp.current) } } @@ -228,12 +229,16 @@ func (kcp *KCP) update_ack(rtt int32) { } func (kcp *KCP) shrink_buf() { + prevUna := kcp.snd_una if len(kcp.snd_buf) > 0 { seg := kcp.snd_buf[0] kcp.snd_una = seg.Number } else { kcp.snd_una = kcp.snd_nxt } + if kcp.snd_una != prevUna { + kcp.sendingUpdated = true + } } func (kcp *KCP) parse_ack(sn uint32) { @@ -282,6 +287,7 @@ func (kcp *KCP) HandleReceivingNext(receivingNext uint32) { func (kcp *KCP) HandleSendingNext(sendingNext uint32) { kcp.acklist.Clear(sendingNext) + kcp.receivingUpdated = true } func (kcp *KCP) parse_data(newseg *DataSegment) { @@ -301,7 +307,8 @@ func (kcp *KCP) parse_data(newseg *DataSegment) { // Input when you received a low level packet (eg. UDP packet), call it func (kcp *KCP) Input(data []byte) int { - log.Info("KCP input at ", kcp.current) + kcp.lastIncomingTime = kcp.current + var seg ISegment var maxack uint32 var flag int @@ -347,11 +354,9 @@ func (kcp *KCP) Input(data []byte) int { kcp.state == StatePeerClosed { kcp.state = StateTerminating kcp.stateBeginTime = kcp.current - log.Info("KCP terminating at ", kcp.current) } else if kcp.state == StateTerminating { kcp.state = StateTerminated kcp.stateBeginTime = kcp.current - log.Info("KCP terminated at ", kcp.current) } } kcp.HandleReceivingNext(seg.ReceivinNext) @@ -381,7 +386,6 @@ func (kcp *KCP) flush() { if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 { kcp.state = StateTerminated - log.Info("KCP terminated at ", kcp.current) kcp.stateBeginTime = kcp.current } return @@ -389,12 +393,10 @@ func (kcp *KCP) flush() { if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 { kcp.state = StateTerminating - log.Info("KCP terminating at ", kcp.current) kcp.stateBeginTime = kcp.current } current := kcp.current - segSent := false //lost := false //var seg Segment @@ -410,7 +412,7 @@ func (kcp *KCP) flush() { ackSeg.ReceivingWindow = uint32(kcp.rcv_nxt + kcp.rcv_wnd) ackSeg.ReceivingNext = kcp.rcv_nxt kcp.output.Write(ackSeg) - segSent = true + kcp.receivingUpdated = false } // calculate window size @@ -465,7 +467,7 @@ func (kcp *KCP) flush() { } kcp.output.Write(segment) - segSent = true + kcp.sendingUpdated = false if segment.transmit >= kcp.dead_link { kcp.state = 0xFFFFFFFF @@ -473,43 +475,25 @@ func (kcp *KCP) flush() { } } + if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 { + seg := &CmdOnlySegment{ + Conv: kcp.conv, + Cmd: SegmentCommandPing, + ReceivinNext: kcp.rcv_nxt, + SendingNext: kcp.snd_una, + } + if kcp.state == StateReadyToClose { + seg.Opt = SegmentOptionClose + } + kcp.output.Write(seg) + kcp.lastPingTime = kcp.current + kcp.sendingUpdated = false + kcp.receivingUpdated = false + } + // flash remain segments kcp.output.Flush() - if !segSent && kcp.state == StateReadyToClose { - kcp.output.Write(&CmdOnlySegment{ - Conv: kcp.conv, - Cmd: SegmentCommandPing, - Opt: SegmentOptionClose, - ReceivinNext: kcp.rcv_nxt, - SendingNext: kcp.snd_nxt, - }) - kcp.output.Flush() - segSent = true - } - - if !segSent && kcp.state == StateTerminating { - kcp.output.Write(&CmdOnlySegment{ - Conv: kcp.conv, - Cmd: SegmentCommandTerminated, - ReceivinNext: kcp.rcv_nxt, - SendingNext: kcp.snd_una, - }) - kcp.output.Flush() - segSent = true - } - - if !segSent { - kcp.output.Write(&CmdOnlySegment{ - Conv: kcp.conv, - Cmd: SegmentCommandPing, - ReceivinNext: kcp.rcv_nxt, - SendingNext: kcp.snd_una, - }) - kcp.output.Flush() - segSent = true - } - // update ssthresh // rate halving, https://tools.ietf.org/html/rfc6937 /* diff --git a/transport/internet/kcp/segment.go b/transport/internet/kcp/segment.go index 9d4fba024..52d65d24a 100644 --- a/transport/internet/kcp/segment.go +++ b/transport/internet/kcp/segment.go @@ -28,6 +28,10 @@ type ISegment interface { Bytes([]byte) []byte } +const ( + DataSegmentOverhead = 18 +) + type DataSegment struct { Conv uint16 Opt SegmentOption