From 00841583d2351b77d2d7737f3d433afd5fe107be Mon Sep 17 00:00:00 2001 From: v2ray Date: Fri, 5 Aug 2016 20:59:33 +0200 Subject: [PATCH] reduce ack packet size and send peer RTO --- transport/internet/kcp/connection.go | 26 +++++++++++++++---- transport/internet/kcp/receiving.go | 3 ++- transport/internet/kcp/segment.go | 36 +++++++++++++++++--------- transport/internet/kcp/segment_test.go | 4 +-- transport/internet/kcp/sending.go | 8 +++--- 5 files changed, 53 insertions(+), 24 deletions(-) diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index d2bafec98..dd021757b 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -50,13 +50,26 @@ func nowMillisec() int64 { type RountTripInfo struct { sync.RWMutex - variation uint32 - srtt uint32 - rto uint32 - minRtt uint32 + variation uint32 + srtt uint32 + rto uint32 + minRtt uint32 + updatedTimestamp uint32 } -func (this *RountTripInfo) Update(rtt uint32) { +func (this *RountTripInfo) UpdatePeerRTO(rto uint32, current uint32) { + this.Lock() + defer this.Unlock() + + if current-this.updatedTimestamp < 5000 { + return + } + + this.updatedTimestamp = current + this.rto = rto +} + +func (this *RountTripInfo) Update(rtt uint32, current uint32) { if rtt > 0x7FFFFFFF { return } @@ -89,6 +102,7 @@ func (this *RountTripInfo) Update(rtt uint32) { rto = 10000 } this.rto = rto * 3 / 2 + this.updatedTimestamp = current } func (this *RountTripInfo) Timeout() uint32 { @@ -449,6 +463,7 @@ func (this *Connection) Input(data []byte) int { } this.sendingWorker.ProcessReceivingNext(seg.ReceivinNext) this.receivingWorker.ProcessSendingNext(seg.SendingNext) + this.roundTrip.UpdatePeerRTO(seg.PeerRTO, current) seg.Release() default: } @@ -503,6 +518,7 @@ func (this *Connection) flush() { seg.Command = CommandPing seg.ReceivinNext = this.receivingWorker.nextNumber seg.SendingNext = this.sendingWorker.firstUnacknowledged + seg.PeerRTO = this.roundTrip.Timeout() if this.State() == StateReadyToClose { seg.Option = SegmentOptionClose } diff --git a/transport/internet/kcp/receiving.go b/transport/internet/kcp/receiving.go index 6fd1f2519..33fbe6ea7 100644 --- a/transport/internet/kcp/receiving.go +++ b/transport/internet/kcp/receiving.go @@ -162,7 +162,8 @@ func (this *AckList) Flush(current uint32, rto uint32) { seg := NewAckSegment() for i := 0; i < len(this.numbers) && !seg.IsFull(); i++ { if this.nextFlush[i] <= current { - seg.PutNumber(this.numbers[i], this.timestamps[i]) + seg.PutNumber(this.numbers[i]) + seg.PutTimestamp(this.timestamps[i]) this.nextFlush[i] = current + rto/2 } } diff --git a/transport/internet/kcp/segment.go b/transport/internet/kcp/segment.go index 58853ea42..322db2463 100644 --- a/transport/internet/kcp/segment.go +++ b/transport/internet/kcp/segment.go @@ -74,19 +74,24 @@ type AckSegment struct { Option SegmentOption ReceivingWindow uint32 ReceivingNext uint32 + Timestamp uint32 Count byte NumberList []uint32 - TimestampList []uint32 } func NewAckSegment() *AckSegment { return new(AckSegment) } -func (this *AckSegment) PutNumber(number uint32, timestamp uint32) { +func (this *AckSegment) PutTimestamp(timestamp uint32) { + if timestamp-this.Timestamp < 0x7FFFFFFF { + this.Timestamp = timestamp + } +} + +func (this *AckSegment) PutNumber(number uint32) { this.Count++ this.NumberList = append(this.NumberList, number) - this.TimestampList = append(this.TimestampList, timestamp) } func (this *AckSegment) IsFull() bool { @@ -94,7 +99,7 @@ func (this *AckSegment) IsFull() bool { } func (this *AckSegment) ByteSize() int { - return 2 + 1 + 1 + 4 + 4 + 1 + int(this.Count)*4 + int(this.Count)*4 + return 2 + 1 + 1 + 4 + 4 + 4 + 1 + int(this.Count)*4 } func (this *AckSegment) Bytes(b []byte) []byte { @@ -102,17 +107,16 @@ func (this *AckSegment) Bytes(b []byte) []byte { b = append(b, byte(CommandACK), byte(this.Option)) b = serial.Uint32ToBytes(this.ReceivingWindow, b) b = serial.Uint32ToBytes(this.ReceivingNext, b) + b = serial.Uint32ToBytes(this.Timestamp, b) b = append(b, this.Count) for i := byte(0); i < this.Count; i++ { b = serial.Uint32ToBytes(this.NumberList[i], b) - b = serial.Uint32ToBytes(this.TimestampList[i], b) } return b } func (this *AckSegment) Release() { this.NumberList = nil - this.TimestampList = nil } type CmdOnlySegment struct { @@ -121,6 +125,7 @@ type CmdOnlySegment struct { Option SegmentOption SendingNext uint32 ReceivinNext uint32 + PeerRTO uint32 } func NewCmdOnlySegment() *CmdOnlySegment { @@ -128,7 +133,7 @@ func NewCmdOnlySegment() *CmdOnlySegment { } func (this *CmdOnlySegment) ByteSize() int { - return 2 + 1 + 1 + 4 + 4 + return 2 + 1 + 1 + 4 + 4 + 4 } func (this *CmdOnlySegment) Bytes(b []byte) []byte { @@ -136,6 +141,7 @@ func (this *CmdOnlySegment) Bytes(b []byte) []byte { b = append(b, byte(this.Command), byte(this.Option)) b = serial.Uint32ToBytes(this.SendingNext, b) b = serial.Uint32ToBytes(this.ReceivinNext, b) + b = serial.Uint32ToBytes(this.PeerRTO, b) return b } @@ -186,7 +192,7 @@ func ReadSegment(buf []byte) (Segment, []byte) { seg := NewAckSegment() seg.Conv = conv seg.Option = opt - if len(buf) < 9 { + if len(buf) < 13 { return nil, nil } @@ -196,15 +202,18 @@ func ReadSegment(buf []byte) (Segment, []byte) { seg.ReceivingNext = serial.BytesToUint32(buf) buf = buf[4:] + seg.Timestamp = serial.BytesToUint32(buf) + buf = buf[4:] + count := int(buf[0]) buf = buf[1:] - if len(buf) < count*8 { + if len(buf) < count*4 { return nil, nil } for i := 0; i < count; i++ { - seg.PutNumber(serial.BytesToUint32(buf), serial.BytesToUint32(buf[4:])) - buf = buf[8:] + seg.PutNumber(serial.BytesToUint32(buf)) + buf = buf[4:] } return seg, buf @@ -215,7 +224,7 @@ func ReadSegment(buf []byte) (Segment, []byte) { seg.Command = cmd seg.Option = opt - if len(buf) < 8 { + if len(buf) < 12 { return nil, nil } @@ -225,5 +234,8 @@ func ReadSegment(buf []byte) (Segment, []byte) { seg.ReceivinNext = serial.BytesToUint32(buf) buf = buf[4:] + seg.PeerRTO = serial.BytesToUint32(buf) + buf = buf[4:] + return seg, buf } diff --git a/transport/internet/kcp/segment_test.go b/transport/internet/kcp/segment_test.go index 97fa67c7e..da4600385 100644 --- a/transport/internet/kcp/segment_test.go +++ b/transport/internet/kcp/segment_test.go @@ -48,9 +48,9 @@ func TestACKSegment(t *testing.T) { Conv: 1, ReceivingWindow: 2, ReceivingNext: 3, + Timestamp: 10, Count: 5, NumberList: []uint32{1, 3, 5, 7, 9}, - TimestampList: []uint32{2, 4, 6, 8, 10}, } nBytes := seg.ByteSize() @@ -64,8 +64,8 @@ func TestACKSegment(t *testing.T) { assert.Uint32(seg2.ReceivingWindow).Equals(seg.ReceivingWindow) assert.Uint32(seg2.ReceivingNext).Equals(seg.ReceivingNext) assert.Byte(seg2.Count).Equals(seg.Count) + assert.Uint32(seg2.Timestamp).Equals(seg.Timestamp) for i := byte(0); i < seg2.Count; i++ { - assert.Uint32(seg2.TimestampList[i]).Equals(seg.TimestampList[i]) assert.Uint32(seg2.NumberList[i]).Equals(seg.NumberList[i]) } } diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index c2aca9dc0..789f29b3d 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -317,14 +317,14 @@ func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) { this.remoteNextNumber = seg.ReceivingWindow } this.ProcessReceivingNextWithoutLock(seg.ReceivingNext) + if current-seg.Timestamp < 10000 { + this.conn.roundTrip.Update(current-seg.Timestamp, current) + } var maxack uint32 for i := 0; i < int(seg.Count); i++ { - timestamp := seg.TimestampList[i] number := seg.NumberList[i] - if current-timestamp < 10000 { - this.conn.roundTrip.Update(current - timestamp) - } + this.ProcessAck(number) if maxack < number { maxack = number