From f3a83c57ab8a28295dc69cfd427d719ba7de2c87 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Tue, 11 Oct 2016 12:24:19 +0200 Subject: [PATCH] optimize ping and updater logic --- transport/internet/kcp/connection.go | 55 ++++++++++++++-------------- transport/internet/kcp/listener.go | 5 ++- transport/internet/kcp/sending.go | 23 ++++++++---- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index ecd80a3d7..7b3ad43b5 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -233,10 +233,11 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, }, conn.updateTask) conn.pingUpdater = NewUpdater( - 3000, // 3 seconds + 5000, // 5 seconds func() bool { return conn.State() != StateTerminated }, func() bool { return conn.State() == StateTerminated }, conn.updateTask) + conn.pingUpdater.WakeUp() return conn } @@ -333,24 +334,22 @@ func (this *Connection) SetState(state State) { switch state { case StateReadyToClose: this.receivingWorker.CloseRead() - this.dataUpdater.WakeUp() case StatePeerClosed: this.sendingWorker.CloseWrite() - this.dataUpdater.WakeUp() case StateTerminating: this.receivingWorker.CloseRead() this.sendingWorker.CloseWrite() - this.dataUpdater.interval = time.Second - this.dataUpdater.WakeUp() + this.pingUpdater.interval = time.Second case StatePeerTerminating: this.sendingWorker.CloseWrite() - this.dataUpdater.WakeUp() + this.pingUpdater.interval = time.Second case StateTerminated: this.receivingWorker.CloseRead() this.sendingWorker.CloseWrite() - this.dataUpdater.interval = time.Second + this.pingUpdater.interval = time.Second this.dataUpdater.WakeUp() - this.Terminate() + this.pingUpdater.WakeUp() + go this.Terminate() } } @@ -488,7 +487,6 @@ func (this *Connection) OnPeerClosed() { func (this *Connection) Input(data []byte) int { current := this.Elapsed() atomic.StoreUint32(&this.lastIncomingTime, current) - this.dataUpdater.WakeUp() var seg Segment for { @@ -502,10 +500,12 @@ func (this *Connection) Input(data []byte) int { this.HandleOption(seg.Option) this.receivingWorker.ProcessSegment(seg) this.dataInputCond.Signal() + this.dataUpdater.WakeUp() case *AckSegment: this.HandleOption(seg.Option) this.sendingWorker.ProcessSegment(current, seg) this.dataOutputCond.Signal() + this.dataUpdater.WakeUp() case *CmdOnlySegment: this.HandleOption(seg.Option) if seg.Command == CommandTerminate { @@ -545,12 +545,7 @@ func (this *Connection) flush() { if this.State() == StateTerminating { log.Debug("KCP|Connection: #", this.conv, " sending terminating cmd.") - seg := NewCmdOnlySegment() - defer seg.Release() - - seg.Conv = this.conv - seg.Command = CommandTerminate - this.output.Write(seg) + this.Ping(current, CommandTerminate) this.output.Flush() if current-atomic.LoadUint32(&this.stateBeginTime) > 8000 { @@ -570,19 +565,8 @@ func (this *Connection) flush() { this.receivingWorker.Flush(current) this.sendingWorker.Flush(current) - if current-atomic.LoadUint32(&this.lastPingTime) >= 3000 { - seg := NewCmdOnlySegment() - seg.Conv = this.conv - seg.Command = CommandPing - seg.ReceivinNext = this.receivingWorker.nextNumber - seg.SendingNext = this.sendingWorker.firstUnacknowledged - seg.PeerRTO = this.roundTrip.Timeout() - if this.State() == StateReadyToClose { - seg.Option = SegmentOptionClose - } - this.output.Write(seg) - this.lastPingTime = current - seg.Release() + if current-atomic.LoadUint32(&this.lastPingTime) >= 1000 { + this.Ping(current, CommandPing) } // flash remain segments @@ -592,3 +576,18 @@ func (this *Connection) flush() { func (this *Connection) State() State { return State(atomic.LoadInt32((*int32)(&this.state))) } + +func (this *Connection) Ping(current uint32, cmd Command) { + seg := NewCmdOnlySegment() + seg.Conv = this.conv + seg.Command = cmd + seg.ReceivinNext = this.receivingWorker.nextNumber + seg.SendingNext = this.sendingWorker.firstUnacknowledged + seg.PeerRTO = this.roundTrip.Timeout() + if this.State() == StateReadyToClose { + seg.Option = SegmentOptionClose + } + this.output.Write(seg) + atomic.StoreUint32(&this.lastPingTime, current) + seg.Release() +} diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go index c60f1fedb..77ce76a04 100644 --- a/transport/internet/kcp/listener.go +++ b/transport/internet/kcp/listener.go @@ -139,7 +139,10 @@ func (this *Listener) Accept() (internet.Connection, error) { return nil, ErrClosedListener } select { - case conn := <-this.awaitingConns: + case conn, open := <-this.awaitingConns: + if !open { + break + } if this.tlsConfig != nil { tlsConn := tls.Server(conn, this.tlsConfig) return v2tls.NewConnection(tlsConn), nil diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index a76a24cac..b57beb289 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -171,13 +171,14 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxI type SendingWorker struct { sync.RWMutex - conn *Connection - window *SendingWindow - firstUnacknowledged uint32 - nextNumber uint32 - remoteNextNumber uint32 - controlWindow uint32 - fastResend uint32 + conn *Connection + window *SendingWindow + firstUnacknowledged uint32 + firstUnacknowledgedUpdated bool + nextNumber uint32 + remoteNextNumber uint32 + controlWindow uint32 + fastResend uint32 } func NewSendingWorker(kcp *Connection) *SendingWorker { @@ -205,11 +206,15 @@ func (this *SendingWorker) ProcessReceivingNextWithoutLock(nextNumber uint32) { // Private: Visible for testing. func (this *SendingWorker) FindFirstUnacknowledged() { + v := this.firstUnacknowledged if !this.window.IsEmpty() { this.firstUnacknowledged = this.window.First().Number } else { this.firstUnacknowledged = this.nextNumber } + if v != this.firstUnacknowledged { + this.firstUnacknowledgedUpdated = true + } } // Private: Visible for testing. @@ -322,7 +327,11 @@ func (this *SendingWorker) Flush(current uint32) { if !this.window.IsEmpty() { this.window.Flush(current, this.conn.fastresend, this.conn.roundTrip.Timeout(), cwnd) + } else if this.firstUnacknowledgedUpdated { + this.conn.Ping(current, CommandPing) } + + this.firstUnacknowledgedUpdated = false } func (this *SendingWorker) CloseWrite() {