From b5910dccae9c741d9204c67f006754c0de4466b4 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Tue, 11 Oct 2016 13:17:57 +0200 Subject: [PATCH] simplify fast resend --- transport/internet/kcp/connection.go | 6 ++--- transport/internet/kcp/segment.go | 5 ++-- transport/internet/kcp/sending.go | 36 +++++++++++++--------------- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index 8f4201099..26c581f15 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -102,7 +102,7 @@ func (this *RoundTripInfo) Update(rtt uint32, current uint32) { if rto > 10000 { rto = 10000 } - this.rto = rto * 3 / 2 + this.rto = rto * 5 / 4 this.updatedTimestamp = current } @@ -184,7 +184,6 @@ type Connection struct { receivingWorker *ReceivingWorker sendingWorker *SendingWorker - fastresend uint32 congestionControl bool output *BufferedSegmentWriter @@ -221,7 +220,6 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, } conn.interval = config.Tti.GetValue() conn.receivingWorker = NewReceivingWorker(conn) - conn.fastresend = 2 conn.congestionControl = config.Congestion conn.sendingWorker = NewSendingWorker(conn) @@ -507,7 +505,7 @@ func (this *Connection) Input(data []byte) int { this.dataUpdater.WakeUp() case *AckSegment: this.HandleOption(seg.Option) - this.sendingWorker.ProcessSegment(current, seg) + this.sendingWorker.ProcessSegment(current, seg, this.roundTrip.Timeout()) this.dataOutputCond.Signal() this.dataUpdater.WakeUp() case *CmdOnlySegment: diff --git a/transport/internet/kcp/segment.go b/transport/internet/kcp/segment.go index 7d31d9511..f7a91448d 100644 --- a/transport/internet/kcp/segment.go +++ b/transport/internet/kcp/segment.go @@ -40,9 +40,8 @@ type DataSegment struct { SendingNext uint32 Data *alloc.Buffer - timeout uint32 - ackSkipped uint32 - transmit uint32 + timeout uint32 + transmit uint32 } func NewDataSegment() *DataSegment { diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index b57beb289..1b84dfa79 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -103,7 +103,7 @@ func (this *SendingWindow) Remove(idx uint32) { } } -func (this *SendingWindow) HandleFastAck(number uint32) { +func (this *SendingWindow) HandleFastAck(number uint32, rto uint32) { if this.len == 0 { return } @@ -114,7 +114,9 @@ func (this *SendingWindow) HandleFastAck(number uint32) { break } if number != seg.Number { - seg.ackSkipped++ + if seg.transmit > 0 && seg.timeout > rto/3 { + seg.timeout -= rto / 3 + } } if i == this.last { break @@ -122,7 +124,7 @@ func (this *SendingWindow) HandleFastAck(number uint32) { } } -func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxInFlightSize uint32) { +func (this *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32) { if this.IsEmpty() { return } @@ -133,25 +135,20 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxI for i := this.start; ; i = this.next[i] { segment := this.data[i] needsend := false - if segment.transmit == 0 { + if current-segment.timeout < 0x7FFFFFFF { + if segment.transmit == 0 { + // First time + this.totalInFlightSize++ + } else { + lost++ + } needsend = true - segment.transmit++ - segment.timeout = current + rto - this.totalInFlightSize++ - } else if current-segment.timeout < 0x7FFFFFFF { - needsend = true - segment.transmit++ - segment.timeout = current + rto - lost++ - } else if segment.ackSkipped >= resend { - needsend = true - segment.transmit++ - segment.ackSkipped = 0 segment.timeout = current + rto } if needsend { segment.Timestamp = current + segment.transmit++ this.writer.Write(segment) inFlightSize++ if inFlightSize >= maxInFlightSize { @@ -228,7 +225,7 @@ func (this *SendingWorker) ProcessAck(number uint32) { this.FindFirstUnacknowledged() } -func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) { +func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint32) { defer seg.Release() this.Lock() @@ -252,7 +249,7 @@ func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) { } } - this.window.HandleFastAck(maxack) + this.window.HandleFastAck(maxack, rto) } func (this *SendingWorker) Push(b []byte) int { @@ -271,7 +268,6 @@ func (this *SendingWorker) Push(b []byte) int { seg.Data = AllocateBuffer().Clear().Append(b[:size]) seg.Number = this.nextNumber seg.timeout = 0 - seg.ackSkipped = 0 seg.transmit = 0 this.window.Push(seg) this.nextNumber++ @@ -326,7 +322,7 @@ func (this *SendingWorker) Flush(current uint32) { } if !this.window.IsEmpty() { - this.window.Flush(current, this.conn.fastresend, this.conn.roundTrip.Timeout(), cwnd) + this.window.Flush(current, this.conn.roundTrip.Timeout(), cwnd) } else if this.firstUnacknowledgedUpdated { this.conn.Ping(current, CommandPing) }