From 6ea8691a071dc8246c5d98bc64b0f438ff650112 Mon Sep 17 00:00:00 2001 From: v2ray Date: Mon, 4 Jul 2016 15:54:18 +0200 Subject: [PATCH] correctly apply cwnd --- transport/internet/kcp/sending.go | 38 +++++++++++++++++-------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index 469b8725e..0ff2f19ce 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -16,13 +16,12 @@ type SendingWindow struct { prev []uint32 next []uint32 - inFlightSize uint32 totalInFlightSize uint32 writer SegmentWriter onPacketLoss func(uint32) } -func NewSendingWindow(size uint32, inFlightSize uint32, writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow { +func NewSendingWindow(size uint32, writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow { window := &SendingWindow{ start: 0, cap: size, @@ -33,7 +32,6 @@ func NewSendingWindow(size uint32, inFlightSize uint32, writer SegmentWriter, on next: make([]uint32, size), writer: writer, onPacketLoss: onPacketLoss, - inFlightSize: inFlightSize, } return window } @@ -42,6 +40,14 @@ func (this *SendingWindow) Len() int { return int(this.len) } +func (this *SendingWindow) Size() uint32 { + return this.cap +} + +func (this *SendingWindow) IsFull() bool { + return this.len == this.cap +} + func (this *SendingWindow) Push(seg *DataSegment) { pos := (this.start + this.len) % this.cap this.data[pos] = seg @@ -114,7 +120,7 @@ func (this *SendingWindow) HandleFastAck(number uint32) { } } -func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32) { +func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxInFlightSize uint32) { if this.Len() == 0 { return } @@ -145,7 +151,7 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32) { if needsend { this.writer.Write(segment) inFlightSize++ - if inFlightSize >= this.inFlightSize { + if inFlightSize >= maxInFlightSize { break } } @@ -224,7 +230,6 @@ type SendingWorker struct { kcp *KCP window *SendingWindow queue *SendingQueue - windowSize uint32 firstUnacknowledged uint32 nextNumber uint32 remoteNextNumber uint32 @@ -239,10 +244,9 @@ func NewSendingWorker(kcp *KCP) *SendingWorker { queue: NewSendingQueue(effectiveConfig.GetSendingQueueSize()), fastResend: 2, remoteNextNumber: 32, - windowSize: effectiveConfig.GetSendingWindowSize(), - controlWindow: effectiveConfig.GetSendingWindowSize(), + controlWindow: effectiveConfig.GetSendingInFlightSize(), } - worker.window = NewSendingWindow(effectiveConfig.GetSendingWindowSize(), effectiveConfig.GetSendingInFlightSize(), worker, worker.OnPacketLoss) + worker.window = NewSendingWindow(effectiveConfig.GetSendingWindowSize(), worker, worker.OnPacketLoss) return worker } @@ -268,7 +272,7 @@ func (this *SendingWorker) FindFirstUnacknowledged() { } func (this *SendingWorker) ProcessAck(number uint32) { - if number-this.firstUnacknowledged > this.windowSize { + if number-this.firstUnacknowledged > this.window.Size() { return } @@ -350,11 +354,11 @@ func (this *SendingWorker) OnPacketLoss(lossRate uint32) { } else if lossRate <= 5 { this.controlWindow += this.controlWindow / 4 } - if this.controlWindow < 4 { - this.controlWindow = 4 + if this.controlWindow < 16 { + this.controlWindow = 16 } - if this.controlWindow > 2*this.windowSize { - this.controlWindow = 2 * this.windowSize + if this.controlWindow > 2*effectiveConfig.GetSendingInFlightSize() { + this.controlWindow = 2 * effectiveConfig.GetSendingInFlightSize() } } @@ -362,7 +366,7 @@ func (this *SendingWorker) Flush() { this.Lock() defer this.Unlock() - cwnd := this.firstUnacknowledged + this.windowSize + cwnd := this.firstUnacknowledged + effectiveConfig.GetSendingInFlightSize() if cwnd > this.remoteNextNumber { cwnd = this.remoteNextNumber } @@ -370,7 +374,7 @@ func (this *SendingWorker) Flush() { cwnd = this.firstUnacknowledged + this.controlWindow } - for !this.queue.IsEmpty() && _itimediff(this.nextNumber, cwnd) < 0 { + for !this.queue.IsEmpty() && !this.window.IsFull() { seg := this.queue.Pop() seg.Number = this.nextNumber seg.timeout = this.kcp.current @@ -380,7 +384,7 @@ func (this *SendingWorker) Flush() { this.nextNumber++ } - this.window.Flush(this.kcp.current, this.kcp.fastresend, this.kcp.rx_rto) + this.window.Flush(this.kcp.current, this.kcp.fastresend, this.kcp.rx_rto, cwnd) } func (this *SendingWorker) CloseWrite() {