1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-11-18 10:26:01 -05:00
v2fly/transport/internet/kcp/receiving.go

288 lines
5.4 KiB
Go
Raw Normal View History

2016-06-25 15:35:18 -04:00
package kcp
2016-06-30 08:51:49 -04:00
import (
"sync"
2016-12-09 05:35:27 -05:00
"v2ray.com/core/common/buf"
2016-06-30 08:51:49 -04:00
)
2016-06-25 15:35:18 -04:00
type ReceivingWindow struct {
start uint32
size uint32
2016-06-29 04:34:34 -04:00
list []*DataSegment
2016-06-25 15:35:18 -04:00
}
func NewReceivingWindow(size uint32) *ReceivingWindow {
return &ReceivingWindow{
start: 0,
size: size,
2016-06-29 04:34:34 -04:00
list: make([]*DataSegment, size),
2016-06-25 15:35:18 -04:00
}
}
2018-01-17 11:36:14 -05:00
func (w *ReceivingWindow) Size() uint32 {
return w.size
2016-06-25 15:35:18 -04:00
}
2018-07-02 09:30:23 -04:00
func (w *ReceivingWindow) Position(idx uint32) (uint32, bool) {
if idx >= w.size {
return 0, false
}
return (w.start + idx) % w.size, true
2016-06-25 15:35:18 -04:00
}
2018-01-17 11:36:14 -05:00
func (w *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
2018-07-02 09:30:23 -04:00
pos, ok := w.Position(idx)
if !ok {
return false
}
2018-01-17 11:36:14 -05:00
if w.list[pos] != nil {
2016-06-25 15:35:18 -04:00
return false
}
2018-01-17 11:36:14 -05:00
w.list[pos] = value
2016-06-25 15:35:18 -04:00
return true
}
2018-01-17 11:36:14 -05:00
func (w *ReceivingWindow) Remove(idx uint32) *DataSegment {
2018-07-02 09:30:23 -04:00
pos, ok := w.Position(idx)
if !ok {
return nil
}
2018-01-17 11:36:14 -05:00
e := w.list[pos]
w.list[pos] = nil
2016-06-25 15:35:18 -04:00
return e
}
2018-01-17 11:36:14 -05:00
func (w *ReceivingWindow) RemoveFirst() *DataSegment {
return w.Remove(0)
2016-06-25 15:35:18 -04:00
}
2017-02-17 18:04:25 -05:00
func (w *ReceivingWindow) HasFirst() bool {
2018-07-02 09:30:23 -04:00
pos, _ := w.Position(0)
return w.list[pos] != nil
2017-02-17 18:04:25 -05:00
}
2018-01-17 11:36:14 -05:00
func (w *ReceivingWindow) Advance() {
w.start++
if w.start == w.size {
w.start = 0
2016-06-25 15:35:18 -04:00
}
}
2016-06-27 16:34:46 -04:00
2016-07-02 05:33:34 -04:00
type AckList struct {
2016-07-02 15:26:50 -04:00
writer SegmentWriter
2016-06-27 16:34:46 -04:00
timestamps []uint32
numbers []uint32
2016-06-30 16:19:30 -04:00
nextFlush []uint32
2016-11-28 16:06:32 -05:00
flushCandidates []uint32
2016-12-02 10:49:33 -05:00
dirty bool
2016-06-30 16:19:30 -04:00
}
2016-07-02 17:18:12 -04:00
func NewAckList(writer SegmentWriter) *AckList {
2016-07-02 05:33:34 -04:00
return &AckList{
2016-11-28 16:06:32 -05:00
writer: writer,
2016-12-02 10:49:33 -05:00
timestamps: make([]uint32, 0, 128),
numbers: make([]uint32, 0, 128),
nextFlush: make([]uint32, 0, 128),
2016-11-28 16:06:32 -05:00
flushCandidates: make([]uint32, 0, 128),
2016-06-30 16:19:30 -04:00
}
2016-06-27 16:34:46 -04:00
}
2018-01-17 11:36:14 -05:00
func (l *AckList) Add(number uint32, timestamp uint32) {
l.timestamps = append(l.timestamps, timestamp)
l.numbers = append(l.numbers, number)
l.nextFlush = append(l.nextFlush, 0)
l.dirty = true
2016-06-27 16:34:46 -04:00
}
2018-01-17 11:36:14 -05:00
func (l *AckList) Clear(una uint32) {
2016-06-27 16:34:46 -04:00
count := 0
2018-01-17 11:36:14 -05:00
for i := 0; i < len(l.numbers); i++ {
if l.numbers[i] < una {
2016-11-18 10:19:13 -05:00
continue
2016-06-27 16:34:46 -04:00
}
2016-11-18 10:19:13 -05:00
if i != count {
2018-01-17 11:36:14 -05:00
l.numbers[count] = l.numbers[i]
l.timestamps[count] = l.timestamps[i]
l.nextFlush[count] = l.nextFlush[i]
2016-11-18 10:19:13 -05:00
}
count++
2016-06-27 16:34:46 -04:00
}
2018-01-17 11:36:14 -05:00
if count < len(l.numbers) {
l.numbers = l.numbers[:count]
l.timestamps = l.timestamps[:count]
l.nextFlush = l.nextFlush[:count]
l.dirty = true
2016-06-29 17:41:04 -04:00
}
2016-06-27 16:34:46 -04:00
}
2018-01-17 11:36:14 -05:00
func (l *AckList) Flush(current uint32, rto uint32) {
l.flushCandidates = l.flushCandidates[:0]
2016-11-28 16:06:32 -05:00
2016-07-05 04:28:23 -04:00
seg := NewAckSegment()
2018-01-17 11:36:14 -05:00
for i := 0; i < len(l.numbers); i++ {
if l.nextFlush[i] > current {
if len(l.flushCandidates) < cap(l.flushCandidates) {
l.flushCandidates = append(l.flushCandidates, l.numbers[i])
2016-11-28 16:06:32 -05:00
}
2016-11-18 10:19:13 -05:00
continue
}
2018-01-17 11:36:14 -05:00
seg.PutNumber(l.numbers[i])
seg.PutTimestamp(l.timestamps[i])
2016-11-29 11:12:09 -05:00
timeout := rto / 2
2016-11-18 10:19:13 -05:00
if timeout < 20 {
timeout = 20
2016-06-30 16:19:30 -04:00
}
2018-01-17 11:36:14 -05:00
l.nextFlush[i] = current + timeout
2016-11-29 02:31:19 -05:00
if seg.IsFull() {
2018-01-17 11:36:14 -05:00
l.writer.Write(seg)
2016-11-29 02:31:19 -05:00
seg.Release()
seg = NewAckSegment()
2018-01-17 11:36:14 -05:00
l.dirty = false
2016-11-29 02:31:19 -05:00
}
2016-06-27 16:34:46 -04:00
}
2018-01-17 11:36:14 -05:00
if l.dirty || !seg.IsEmpty() {
for _, number := range l.flushCandidates {
2016-11-28 16:06:32 -05:00
if seg.IsFull() {
break
}
seg.PutNumber(number)
}
2018-01-17 11:36:14 -05:00
l.writer.Write(seg)
2016-07-05 08:08:08 -04:00
seg.Release()
2018-01-17 11:36:14 -05:00
l.dirty = false
2016-06-30 16:19:30 -04:00
}
2016-07-02 15:26:50 -04:00
}
type ReceivingWorker struct {
2016-07-12 11:56:36 -04:00
sync.RWMutex
2016-07-06 11:34:38 -04:00
conn *Connection
leftOver buf.MultiBuffer
2016-07-06 11:34:38 -04:00
window *ReceivingWindow
acklist *AckList
nextNumber uint32
windowSize uint32
2016-07-02 15:26:50 -04:00
}
2016-07-05 17:02:52 -04:00
func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
2018-07-02 09:30:23 -04:00
windowsSize := kcp.Config.GetReceivingInFlightSize()
if windowsSize > kcp.Config.GetReceivingBufferSize() {
windowsSize = kcp.Config.GetReceivingBufferSize()
}
2016-07-02 15:26:50 -04:00
worker := &ReceivingWorker{
2016-07-05 17:02:52 -04:00
conn: kcp,
2016-10-02 17:43:58 -04:00
window: NewReceivingWindow(kcp.Config.GetReceivingBufferSize()),
2018-07-02 09:30:23 -04:00
windowSize: windowsSize,
2016-07-02 15:26:50 -04:00
}
2016-07-02 17:18:12 -04:00
worker.acklist = NewAckList(worker)
2016-07-02 15:26:50 -04:00
return worker
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) Release() {
w.Lock()
w.leftOver.Release()
w.Unlock()
2016-11-21 16:41:12 -05:00
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
w.Lock()
defer w.Unlock()
2016-07-06 11:34:38 -04:00
2017-12-03 08:56:00 -05:00
w.acklist.Clear(number)
2016-07-02 15:26:50 -04:00
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
w.Lock()
defer w.Unlock()
2016-07-06 11:34:38 -04:00
2016-07-02 15:26:50 -04:00
number := seg.Number
2017-12-03 08:56:00 -05:00
idx := number - w.nextNumber
if idx >= w.windowSize {
2016-07-02 15:26:50 -04:00
return
}
2017-12-03 08:56:00 -05:00
w.acklist.Clear(seg.SendingNext)
w.acklist.Add(number, seg.Timestamp)
2016-07-02 15:26:50 -04:00
2017-12-03 08:56:00 -05:00
if !w.window.Set(idx, seg) {
2016-07-02 15:26:50 -04:00
seg.Release()
}
2016-08-24 17:51:53 -04:00
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
if w.leftOver != nil {
mb := w.leftOver
w.leftOver = nil
return mb
2016-08-24 17:51:53 -04:00
}
2016-07-02 15:26:50 -04:00
2017-11-08 18:55:28 -05:00
mb := buf.NewMultiBufferCap(32)
2017-12-03 08:56:00 -05:00
w.Lock()
defer w.Unlock()
for {
2017-12-03 08:56:00 -05:00
seg := w.window.RemoveFirst()
2016-07-02 15:26:50 -04:00
if seg == nil {
break
}
2017-12-03 08:56:00 -05:00
w.window.Advance()
w.nextNumber++
2017-12-03 16:53:00 -05:00
mb.Append(seg.Detach())
2016-08-24 17:51:53 -04:00
seg.Release()
}
return mb
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) Read(b []byte) int {
mb := w.ReadMultiBuffer()
nBytes, _ := mb.Read(b)
if !mb.IsEmpty() {
2017-12-03 08:56:00 -05:00
w.leftOver = mb
}
return nBytes
2016-07-02 15:26:50 -04:00
}
2017-02-17 18:04:25 -05:00
func (w *ReceivingWorker) IsDataAvailable() bool {
w.RLock()
defer w.RUnlock()
return w.window.HasFirst()
}
func (w *ReceivingWorker) NextNumber() uint32 {
w.RLock()
defer w.RUnlock()
return w.nextNumber
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) Flush(current uint32) {
w.Lock()
defer w.Unlock()
2016-07-06 11:34:38 -04:00
2017-12-03 08:56:00 -05:00
w.acklist.Flush(current, w.conn.roundTrip.Timeout())
2016-07-02 15:26:50 -04:00
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) Write(seg Segment) error {
2016-07-02 16:17:41 -04:00
ackSeg := seg.(*AckSegment)
2017-12-14 17:24:40 -05:00
ackSeg.Conv = w.conn.meta.Conversation
2017-12-03 08:56:00 -05:00
ackSeg.ReceivingNext = w.nextNumber
ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
if w.conn.State() == StateReadyToClose {
2016-07-14 16:52:00 -04:00
ackSeg.Option = SegmentOptionClose
2016-07-02 15:26:50 -04:00
}
2017-12-03 08:56:00 -05:00
return w.conn.output.Write(ackSeg)
2016-07-02 15:26:50 -04:00
}
2017-12-03 08:56:00 -05:00
func (*ReceivingWorker) CloseRead() {
2016-07-02 15:26:50 -04:00
}
2017-12-03 08:56:00 -05:00
func (w *ReceivingWorker) UpdateNecessary() bool {
w.RLock()
defer w.RUnlock()
2017-02-17 18:04:25 -05:00
2017-12-03 08:56:00 -05:00
return len(w.acklist.numbers) > 0
}