1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-02 15:36:41 -05:00

redo kcp.ReceivingWindow

This commit is contained in:
Darien Raymond 2018-07-08 12:24:37 +02:00
parent 69774c99dc
commit bbeae5be48
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
2 changed files with 22 additions and 92 deletions

View File

@ -7,66 +7,36 @@ import (
)
type ReceivingWindow struct {
start uint32
size uint32
list []*DataSegment
cache map[uint32]*DataSegment
}
func NewReceivingWindow(size uint32) *ReceivingWindow {
func NewReceivingWindow() *ReceivingWindow {
return &ReceivingWindow{
start: 0,
size: size,
list: make([]*DataSegment, size),
cache: make(map[uint32]*DataSegment),
}
}
func (w *ReceivingWindow) Size() uint32 {
return w.size
}
func (w *ReceivingWindow) Position(idx uint32) (uint32, bool) {
if idx >= w.size {
return 0, false
}
return (w.start + idx) % w.size, true
}
func (w *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
pos, ok := w.Position(idx)
if !ok {
func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {
_, f := w.cache[id]
if f {
return false
}
if w.list[pos] != nil {
return false
}
w.list[pos] = value
w.cache[id] = value
return true
}
func (w *ReceivingWindow) Remove(idx uint32) *DataSegment {
pos, ok := w.Position(idx)
if !ok {
func (w *ReceivingWindow) Has(id uint32) bool {
_, f := w.cache[id]
return f
}
func (w *ReceivingWindow) Remove(id uint32) *DataSegment {
v, f := w.cache[id]
if !f {
return nil
}
e := w.list[pos]
w.list[pos] = nil
return e
}
func (w *ReceivingWindow) RemoveFirst() *DataSegment {
return w.Remove(0)
}
func (w *ReceivingWindow) HasFirst() bool {
pos, _ := w.Position(0)
return w.list[pos] != nil
}
func (w *ReceivingWindow) Advance() {
w.start++
if w.start == w.size {
w.start = 0
}
delete(w.cache, id)
return v
}
type AckList struct {
@ -167,15 +137,10 @@ type ReceivingWorker struct {
}
func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
windowsSize := kcp.Config.GetReceivingInFlightSize()
if windowsSize > kcp.Config.GetReceivingBufferSize() {
windowsSize = kcp.Config.GetReceivingBufferSize()
}
worker := &ReceivingWorker{
conn: kcp,
window: NewReceivingWindow(kcp.Config.GetReceivingBufferSize()),
windowSize: windowsSize,
window: NewReceivingWindow(),
windowSize: kcp.Config.GetReceivingInFlightSize(),
}
worker.acklist = NewAckList(worker)
return worker
@ -206,7 +171,7 @@ func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
w.acklist.Clear(seg.SendingNext)
w.acklist.Add(number, seg.Timestamp)
if !w.window.Set(idx, seg) {
if !w.window.Set(seg.Number, seg) {
seg.Release()
}
}
@ -223,11 +188,10 @@ func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
w.Lock()
defer w.Unlock()
for {
seg := w.window.RemoveFirst()
seg := w.window.Remove(w.nextNumber)
if seg == nil {
break
}
w.window.Advance()
w.nextNumber++
mb.Append(seg.Detach())
seg.Release()
@ -248,7 +212,7 @@ func (w *ReceivingWorker) Read(b []byte) int {
func (w *ReceivingWorker) IsDataAvailable() bool {
w.RLock()
defer w.RUnlock()
return w.window.HasFirst()
return w.window.Has(w.nextNumber)
}
func (w *ReceivingWorker) NextNumber() uint32 {

View File

@ -1,34 +0,0 @@
package kcp_test
import (
"testing"
. "v2ray.com/core/transport/internet/kcp"
. "v2ray.com/ext/assert"
)
func TestRecivingWindow(t *testing.T) {
assert := With(t)
window := NewReceivingWindow(3)
seg0 := &DataSegment{}
seg1 := &DataSegment{}
seg2 := &DataSegment{}
seg3 := &DataSegment{}
assert(window.Set(0, seg0), IsTrue)
assert(window.RemoveFirst(), Equals, seg0)
e := window.RemoveFirst()
assert(e, IsNil)
assert(window.Set(1, seg1), IsTrue)
assert(window.Set(2, seg2), IsTrue)
window.Advance()
assert(window.Set(2, seg3), IsTrue)
assert(window.RemoveFirst(), Equals, seg1)
assert(window.Remove(1), Equals, seg2)
assert(window.Remove(2), Equals, seg3)
}