1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-22 18:17:52 -05:00

cleanup kcp

This commit is contained in:
Darien Raymond 2017-12-03 14:56:00 +01:00
parent bcfcba396b
commit 85a93e9602
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
3 changed files with 107 additions and 262 deletions

View File

@ -1,155 +0,0 @@
package kcp
import (
"sync"
)
const (
defaultRTT = 100
queueSize = 10
)
type Queue struct {
value [queueSize]uint32
start uint32
length uint32
}
func (v *Queue) Push(value uint32) {
if v.length < queueSize {
v.value[v.length] = value
v.length++
return
}
v.value[v.start] = value
v.start++
if v.start == queueSize {
v.start = 0
}
}
func (v *Queue) Max() uint32 {
max := v.value[0]
for i := 1; i < queueSize; i++ {
if v.value[i] > max {
max = v.value[i]
}
}
return max
}
func (v *Queue) Min() uint32 {
max := v.value[0]
for i := 1; i < queueSize; i++ {
if v.value[i] < max {
max = v.value[i]
}
}
return max
}
type CongestionState byte
const (
CongestionStateRTTProbe CongestionState = iota
CongestionStateBandwidthProbe
CongestionStateTransfer
)
type Congestion struct {
sync.RWMutex
state CongestionState
stateSince uint32
limit uint32 // bytes per 1000 seconds
rtt uint32 // millisec
rttHistory Queue
rttUpdateTime uint32
initialThroughput uint32 // bytes per 1000 seconds
cycleStartTime uint32
cycleBytesConfirmed uint32
cycleBytesSent uint32
cycleBytesLimit uint32
cycle uint32
bestCycleBytesConfirmed uint32
bestCycleBytesSent uint32
}
func (v *Congestion) SetState(current uint32, state CongestionState) {
v.state = state
v.stateSince = current
}
func (v *Congestion) Update(current uint32) {
switch v.state {
case CongestionStateRTTProbe:
if v.rtt > 0 {
v.SetState(current, CongestionStateBandwidthProbe)
v.cycleStartTime = current
v.cycleBytesConfirmed = 0
v.cycleBytesSent = 0
v.cycleBytesLimit = v.initialThroughput * v.rtt / 1000 / 1000
}
case CongestionStateBandwidthProbe:
if current-v.cycleStartTime >= v.rtt {
}
}
}
func (v *Congestion) AddBytesConfirmed(current uint32, bytesConfirmed uint32) {
v.Lock()
defer v.Unlock()
v.cycleBytesConfirmed += bytesConfirmed
v.Update(current)
}
func (v *Congestion) UpdateRTT(current uint32, rtt uint32) {
v.Lock()
defer v.Unlock()
if v.state == CongestionStateRTTProbe || rtt < v.rtt {
v.rtt = rtt
v.rttUpdateTime = current
}
v.Update(current)
}
func (v *Congestion) GetBytesLimit() uint32 {
v.RLock()
defer v.RUnlock()
if v.state == CongestionStateRTTProbe {
return v.initialThroughput/1000/(1000/defaultRTT) - v.cycleBytesSent
}
return v.cycleBytesLimit
}
func (v *Congestion) RoundTripTime() uint32 {
v.RLock()
defer v.RUnlock()
if v.state == CongestionStateRTTProbe {
return defaultRTT
}
return v.rtt
}
func (v *Congestion) Timeout() uint32 {
v.RLock()
defer v.RUnlock()
if v.state == CongestionStateRTTProbe {
return defaultRTT * 3 / 2
}
return v.rtt * 3 / 2
}

View File

@ -166,54 +166,54 @@ func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
return worker return worker
} }
func (v *ReceivingWorker) Release() { func (w *ReceivingWorker) Release() {
v.Lock() w.Lock()
v.leftOver.Release() w.leftOver.Release()
v.Unlock() w.Unlock()
} }
func (v *ReceivingWorker) ProcessSendingNext(number uint32) { func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
v.acklist.Clear(number) w.acklist.Clear(number)
} }
func (v *ReceivingWorker) ProcessSegment(seg *DataSegment) { func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
number := seg.Number number := seg.Number
idx := number - v.nextNumber idx := number - w.nextNumber
if idx >= v.windowSize { if idx >= w.windowSize {
return return
} }
v.acklist.Clear(seg.SendingNext) w.acklist.Clear(seg.SendingNext)
v.acklist.Add(number, seg.Timestamp) w.acklist.Add(number, seg.Timestamp)
if !v.window.Set(idx, seg) { if !w.window.Set(idx, seg) {
seg.Release() seg.Release()
} }
} }
func (v *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer { func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
if v.leftOver != nil { if w.leftOver != nil {
mb := v.leftOver mb := w.leftOver
v.leftOver = nil w.leftOver = nil
return mb return mb
} }
mb := buf.NewMultiBufferCap(32) mb := buf.NewMultiBufferCap(32)
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
for { for {
seg := v.window.RemoveFirst() seg := w.window.RemoveFirst()
if seg == nil { if seg == nil {
break break
} }
v.window.Advance() w.window.Advance()
v.nextNumber++ w.nextNumber++
mb.Append(seg.Data) mb.Append(seg.Data)
seg.Data = nil seg.Data = nil
seg.Release() seg.Release()
@ -222,11 +222,11 @@ func (v *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
return mb return mb
} }
func (v *ReceivingWorker) Read(b []byte) int { func (w *ReceivingWorker) Read(b []byte) int {
mb := v.ReadMultiBuffer() mb := w.ReadMultiBuffer()
nBytes, _ := mb.Read(b) nBytes, _ := mb.Read(b)
if !mb.IsEmpty() { if !mb.IsEmpty() {
v.leftOver = mb w.leftOver = mb
} }
return nBytes return nBytes
} }
@ -244,30 +244,30 @@ func (w *ReceivingWorker) NextNumber() uint32 {
return w.nextNumber return w.nextNumber
} }
func (v *ReceivingWorker) Flush(current uint32) { func (w *ReceivingWorker) Flush(current uint32) {
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
v.acklist.Flush(current, v.conn.roundTrip.Timeout()) w.acklist.Flush(current, w.conn.roundTrip.Timeout())
} }
func (v *ReceivingWorker) Write(seg Segment) error { func (w *ReceivingWorker) Write(seg Segment) error {
ackSeg := seg.(*AckSegment) ackSeg := seg.(*AckSegment)
ackSeg.Conv = v.conn.conv ackSeg.Conv = w.conn.conv
ackSeg.ReceivingNext = v.nextNumber ackSeg.ReceivingNext = w.nextNumber
ackSeg.ReceivingWindow = v.nextNumber + v.windowSize ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
if v.conn.State() == StateReadyToClose { if w.conn.State() == StateReadyToClose {
ackSeg.Option = SegmentOptionClose ackSeg.Option = SegmentOptionClose
} }
return v.conn.output.Write(ackSeg) return w.conn.output.Write(ackSeg)
} }
func (v *ReceivingWorker) CloseRead() { func (*ReceivingWorker) CloseRead() {
} }
func (v *ReceivingWorker) UpdateNecessary() bool { func (w *ReceivingWorker) UpdateNecessary() bool {
v.RLock() w.RLock()
defer v.RUnlock() defer w.RUnlock()
return len(v.acklist.numbers) > 0 return len(w.acklist.numbers) > 0
} }

View File

@ -36,94 +36,94 @@ func NewSendingWindow(size uint32, writer SegmentWriter, onPacketLoss func(uint3
return window return window
} }
func (v *SendingWindow) Release() { func (sw *SendingWindow) Release() {
if v == nil { if sw == nil {
return return
} }
v.len = 0 sw.len = 0
for _, seg := range v.data { for _, seg := range sw.data {
seg.Release() seg.Release()
} }
} }
func (v *SendingWindow) Len() int { func (sw *SendingWindow) Len() int {
return int(v.len) return int(sw.len)
} }
func (v *SendingWindow) IsEmpty() bool { func (sw *SendingWindow) IsEmpty() bool {
return v.len == 0 return sw.len == 0
} }
func (v *SendingWindow) Size() uint32 { func (sw *SendingWindow) Size() uint32 {
return v.cap return sw.cap
} }
func (v *SendingWindow) IsFull() bool { func (sw *SendingWindow) IsFull() bool {
return v.len == v.cap return sw.len == sw.cap
} }
func (v *SendingWindow) Push(number uint32, data []byte) { func (sw *SendingWindow) Push(number uint32, data []byte) {
pos := (v.start + v.len) % v.cap pos := (sw.start + sw.len) % sw.cap
v.data[pos].SetData(data) sw.data[pos].SetData(data)
v.data[pos].Number = number sw.data[pos].Number = number
v.data[pos].timeout = 0 sw.data[pos].timeout = 0
v.data[pos].transmit = 0 sw.data[pos].transmit = 0
v.inuse[pos] = true sw.inuse[pos] = true
if v.len > 0 { if sw.len > 0 {
v.next[v.last] = pos sw.next[sw.last] = pos
v.prev[pos] = v.last sw.prev[pos] = sw.last
} }
v.last = pos sw.last = pos
v.len++ sw.len++
} }
func (v *SendingWindow) FirstNumber() uint32 { func (sw *SendingWindow) FirstNumber() uint32 {
return v.data[v.start].Number return sw.data[sw.start].Number
} }
func (v *SendingWindow) Clear(una uint32) { func (sw *SendingWindow) Clear(una uint32) {
for !v.IsEmpty() && v.data[v.start].Number < una { for !sw.IsEmpty() && sw.data[sw.start].Number < una {
v.Remove(0) sw.Remove(0)
} }
} }
func (v *SendingWindow) Remove(idx uint32) bool { func (sw *SendingWindow) Remove(idx uint32) bool {
if v.IsEmpty() { if sw.IsEmpty() {
return false return false
} }
pos := (v.start + idx) % v.cap pos := (sw.start + idx) % sw.cap
if !v.inuse[pos] { if !sw.inuse[pos] {
return false return false
} }
v.inuse[pos] = false sw.inuse[pos] = false
v.totalInFlightSize-- sw.totalInFlightSize--
if pos == v.start && pos == v.last { if pos == sw.start && pos == sw.last {
v.len = 0 sw.len = 0
v.start = 0 sw.start = 0
v.last = 0 sw.last = 0
} else if pos == v.start { } else if pos == sw.start {
delta := v.next[pos] - v.start delta := sw.next[pos] - sw.start
if v.next[pos] < v.start { if sw.next[pos] < sw.start {
delta = v.next[pos] + v.cap - v.start delta = sw.next[pos] + sw.cap - sw.start
} }
v.start = v.next[pos] sw.start = sw.next[pos]
v.len -= delta sw.len -= delta
} else if pos == v.last { } else if pos == sw.last {
v.last = v.prev[pos] sw.last = sw.prev[pos]
} else { } else {
v.next[v.prev[pos]] = v.next[pos] sw.next[sw.prev[pos]] = sw.next[pos]
v.prev[v.next[pos]] = v.prev[pos] sw.prev[sw.next[pos]] = sw.prev[pos]
} }
return true return true
} }
func (v *SendingWindow) HandleFastAck(number uint32, rto uint32) { func (sw *SendingWindow) HandleFastAck(number uint32, rto uint32) {
if v.IsEmpty() { if sw.IsEmpty() {
return return
} }
v.Visit(func(seg *DataSegment) bool { sw.Visit(func(seg *DataSegment) bool {
if number == seg.Number || number-seg.Number > 0x7FFFFFFF { if number == seg.Number || number-seg.Number > 0x7FFFFFFF {
return false return false
} }
@ -135,33 +135,33 @@ func (v *SendingWindow) HandleFastAck(number uint32, rto uint32) {
}) })
} }
func (v *SendingWindow) Visit(visitor func(seg *DataSegment) bool) { func (sw *SendingWindow) Visit(visitor func(seg *DataSegment) bool) {
if v.IsEmpty() { if sw.IsEmpty() {
return return
} }
for i := v.start; ; i = v.next[i] { for i := sw.start; ; i = sw.next[i] {
if !visitor(&v.data[i]) || i == v.last { if !visitor(&sw.data[i]) || i == sw.last {
break break
} }
} }
} }
func (v *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32) { func (sw *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32) {
if v.IsEmpty() { if sw.IsEmpty() {
return return
} }
var lost uint32 var lost uint32
var inFlightSize uint32 var inFlightSize uint32
v.Visit(func(segment *DataSegment) bool { sw.Visit(func(segment *DataSegment) bool {
if current-segment.timeout >= 0x7FFFFFFF { if current-segment.timeout >= 0x7FFFFFFF {
return true return true
} }
if segment.transmit == 0 { if segment.transmit == 0 {
// First time // First time
v.totalInFlightSize++ sw.totalInFlightSize++
} else { } else {
lost++ lost++
} }
@ -169,7 +169,7 @@ func (v *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32
segment.Timestamp = current segment.Timestamp = current
segment.transmit++ segment.transmit++
v.writer.Write(segment) sw.writer.Write(segment)
inFlightSize++ inFlightSize++
if inFlightSize >= maxInFlightSize { if inFlightSize >= maxInFlightSize {
return false return false
@ -177,9 +177,9 @@ func (v *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32
return true return true
}) })
if v.onPacketLoss != nil && inFlightSize > 0 && v.totalInFlightSize != 0 { if sw.onPacketLoss != nil && inFlightSize > 0 && sw.totalInFlightSize != 0 {
rate := lost * 100 / v.totalInFlightSize rate := lost * 100 / sw.totalInFlightSize
v.onPacketLoss(rate) sw.onPacketLoss(rate)
} }
} }