1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-07-03 20:45:23 +00:00

remove sending queue

This commit is contained in:
Darien Raymond 2016-08-24 15:47:14 +02:00
parent e6e0419958
commit 5d20e3f70b
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
2 changed files with 27 additions and 156 deletions

View File

@ -2,8 +2,6 @@ package kcp
import ( import (
"sync" "sync"
"v2ray.com/core/common/alloc"
) )
type SendingWindow struct { type SendingWindow struct {
@ -67,6 +65,13 @@ func (this *SendingWindow) First() *DataSegment {
return this.data[this.start] return this.data[this.start]
} }
func (this *SendingWindow) Last() *DataSegment {
if this.IsEmpty() {
return nil
}
return this.data[this.last]
}
func (this *SendingWindow) Clear(una uint32) { func (this *SendingWindow) Clear(una uint32) {
for !this.IsEmpty() && this.data[this.start].Number < una { for !this.IsEmpty() && this.data[this.start].Number < una {
this.Remove(0) this.Remove(0)
@ -171,80 +176,10 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxI
} }
} }
type SendingQueue struct {
start uint32
cap uint32
len uint32
list []*alloc.Buffer
}
func NewSendingQueue(size uint32) *SendingQueue {
return &SendingQueue{
start: 0,
cap: size,
list: make([]*alloc.Buffer, size),
len: 0,
}
}
func (this *SendingQueue) IsFull() bool {
return this.len == this.cap
}
func (this *SendingQueue) IsEmpty() bool {
return this.len == 0
}
func (this *SendingQueue) Pop() *alloc.Buffer {
if this.IsEmpty() {
return nil
}
seg := this.list[this.start]
this.list[this.start] = nil
this.len--
this.start++
if this.start == this.cap {
this.start = 0
}
if this.IsEmpty() {
this.start = 0
}
return seg
}
func (this *SendingQueue) Last() *alloc.Buffer {
if this.IsEmpty() {
return nil
}
return this.list[(this.start+this.len-1+this.cap)%this.cap]
}
func (this *SendingQueue) Push(seg *alloc.Buffer) {
if this.IsFull() {
return
}
this.list[(this.start+this.len)%this.cap] = seg
this.len++
}
func (this *SendingQueue) Clear() {
for i := uint32(0); i < this.len; i++ {
this.list[(i+this.start)%this.cap].Release()
this.list[(i+this.start)%this.cap] = nil
}
this.start = 0
this.len = 0
}
func (this *SendingQueue) Len() uint32 {
return this.len
}
type SendingWorker struct { type SendingWorker struct {
sync.RWMutex sync.RWMutex
conn *Connection conn *Connection
window *SendingWindow window *SendingWindow
queue *SendingQueue
firstUnacknowledged uint32 firstUnacknowledged uint32
nextNumber uint32 nextNumber uint32
remoteNextNumber uint32 remoteNextNumber uint32
@ -256,12 +191,11 @@ type SendingWorker struct {
func NewSendingWorker(kcp *Connection) *SendingWorker { func NewSendingWorker(kcp *Connection) *SendingWorker {
worker := &SendingWorker{ worker := &SendingWorker{
conn: kcp, conn: kcp,
queue: NewSendingQueue(effectiveConfig.GetSendingQueueSize()),
fastResend: 2, fastResend: 2,
remoteNextNumber: 32, remoteNextNumber: 32,
controlWindow: effectiveConfig.GetSendingInFlightSize(), controlWindow: effectiveConfig.GetSendingInFlightSize(),
} }
worker.window = NewSendingWindow(effectiveConfig.GetSendingWindowSize(), worker, worker.OnPacketLoss) worker.window = NewSendingWindow(effectiveConfig.GetSendingQueueSize(), worker, worker.OnPacketLoss)
return worker return worker
} }
@ -301,19 +235,6 @@ func (this *SendingWorker) ProcessAck(number uint32) {
this.FindFirstUnacknowledged() this.FindFirstUnacknowledged()
} }
func (this *SendingWorker) FillWindow(current uint32) {
for !this.queue.IsEmpty() && !this.window.IsFull() {
seg := NewDataSegment()
seg.Data = this.queue.Pop()
seg.Number = this.nextNumber
seg.timeout = current
seg.ackSkipped = 0
seg.transmit = 0
this.window.Push(seg)
this.nextNumber++
}
}
func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) { func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) {
defer seg.Release() defer seg.Release()
@ -339,33 +260,40 @@ func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) {
} }
this.window.HandleFastAck(maxack) this.window.HandleFastAck(maxack)
this.FillWindow(current)
} }
func (this *SendingWorker) Push(b []byte) int { func (this *SendingWorker) Push(b []byte) int {
nBytes := 0 nBytes := 0
this.Lock() this.Lock()
defer this.Unlock() defer this.Unlock()
if !this.queue.IsEmpty() { if !this.window.IsEmpty() {
lastSeg := this.queue.Last() lastSeg := this.window.Last()
if lastSeg.Len() < int(this.conn.mss) { dataLen := lastSeg.Data.Len()
delta := int(this.conn.mss) - lastSeg.Len() if dataLen < int(this.conn.mss) {
delta := int(this.conn.mss) - dataLen
if delta > len(b) { if delta > len(b) {
delta = len(b) delta = len(b)
} }
lastSeg.Append(b[:delta]) lastSeg.Data.Append(b[:delta])
b = b[delta:] b = b[delta:]
nBytes += delta nBytes += delta
} }
} }
for len(b) > 0 && !this.queue.IsFull() { for len(b) > 0 && !this.window.IsFull() {
var size int var size int
if len(b) > int(this.conn.mss) { if len(b) > int(this.conn.mss) {
size = int(this.conn.mss) size = int(this.conn.mss)
} else { } else {
size = len(b) size = len(b)
} }
this.queue.Push(AllocateBuffer().Clear().Append(b[:size])) seg := NewDataSegment()
seg.Data = AllocateBuffer().Clear().Append(b[:size])
seg.Number = this.nextNumber
seg.timeout = 0
seg.ackSkipped = 0
seg.transmit = 0
this.window.Push(seg)
this.nextNumber++
b = b[size:] b = b[size:]
nBytes += size nBytes += size
} }
@ -431,8 +359,9 @@ func (this *SendingWorker) Flush(current uint32) {
cwnd = this.firstUnacknowledged + this.controlWindow cwnd = this.firstUnacknowledged + this.controlWindow
} }
this.FillWindow(current) if !this.window.IsEmpty() {
this.window.Flush(current, this.conn.fastresend, this.conn.roundTrip.Timeout(), cwnd) this.window.Flush(current, this.conn.fastresend, this.conn.roundTrip.Timeout(), cwnd)
}
} }
func (this *SendingWorker) CloseWrite() { func (this *SendingWorker) CloseWrite() {
@ -440,12 +369,11 @@ func (this *SendingWorker) CloseWrite() {
defer this.Unlock() defer this.Unlock()
this.window.Clear(0xFFFFFFFF) this.window.Clear(0xFFFFFFFF)
this.queue.Clear()
} }
func (this *SendingWorker) IsEmpty() bool { func (this *SendingWorker) IsEmpty() bool {
this.RLock() this.RLock()
defer this.RUnlock() defer this.RUnlock()
return this.window.IsEmpty() && this.queue.IsEmpty() return this.window.IsEmpty()
} }

View File

@ -3,67 +3,10 @@ package kcp_test
import ( import (
"testing" "testing"
"v2ray.com/core/common/alloc"
"v2ray.com/core/testing/assert" "v2ray.com/core/testing/assert"
. "v2ray.com/core/transport/internet/kcp" . "v2ray.com/core/transport/internet/kcp"
) )
func TestSendingQueue(t *testing.T) {
assert := assert.On(t)
queue := NewSendingQueue(3)
seg0 := alloc.NewLocalBuffer(512)
seg1 := alloc.NewLocalBuffer(512)
seg2 := alloc.NewLocalBuffer(512)
seg3 := alloc.NewLocalBuffer(512)
assert.Bool(queue.IsEmpty()).IsTrue()
assert.Bool(queue.IsFull()).IsFalse()
queue.Push(seg0)
assert.Bool(queue.IsEmpty()).IsFalse()
queue.Push(seg1)
queue.Push(seg2)
assert.Bool(queue.IsFull()).IsTrue()
assert.Pointer(queue.Pop()).Equals(seg0)
queue.Push(seg3)
assert.Bool(queue.IsFull()).IsTrue()
assert.Pointer(queue.Pop()).Equals(seg1)
assert.Pointer(queue.Pop()).Equals(seg2)
assert.Pointer(queue.Pop()).Equals(seg3)
assert.Int(int(queue.Len())).Equals(0)
}
func TestSendingQueueClear(t *testing.T) {
assert := assert.On(t)
queue := NewSendingQueue(3)
seg0 := alloc.NewLocalBuffer(512)
seg1 := alloc.NewLocalBuffer(512)
seg2 := alloc.NewLocalBuffer(512)
seg3 := alloc.NewLocalBuffer(512)
queue.Push(seg0)
assert.Bool(queue.IsEmpty()).IsFalse()
queue.Clear()
assert.Bool(queue.IsEmpty()).IsTrue()
queue.Push(seg1)
queue.Push(seg2)
queue.Push(seg3)
queue.Clear()
assert.Bool(queue.IsEmpty()).IsTrue()
}
func TestSendingWindow(t *testing.T) { func TestSendingWindow(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)