From 476b3c68d24bfbc7f1c1a6f3e37ed9c949ff76b9 Mon Sep 17 00:00:00 2001 From: v2ray Date: Thu, 14 Jul 2016 12:57:14 +0200 Subject: [PATCH] introduce a new state: peer terminating --- transport/internet/kcp/connection.go | 26 ++++++++++++++++++++------ transport/internet/kcp/listener.go | 7 +++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index 727537f9a..59ea06e83 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -22,11 +22,12 @@ var ( type State int32 const ( - StateActive State = 0 - StateReadyToClose State = 1 - StatePeerClosed State = 2 - StateTerminating State = 3 - StateTerminated State = 4 + StateActive State = 0 + StateReadyToClose State = 1 + StatePeerClosed State = 2 + StateTerminating State = 3 + StatePeerTerminating State = 4 + StateTerminated State = 5 ) const ( @@ -177,6 +178,10 @@ func (this *Connection) Read(b []byte) (int, error) { return nBytes, nil } + if this.State() == StatePeerTerminating { + return 0, io.EOF + } + var timer *time.Timer if !this.rd.IsZero() { duration := this.rd.Sub(time.Now()) @@ -240,6 +245,8 @@ func (this *Connection) SetState(state State) { case StateTerminating: this.receivingWorker.CloseRead() this.sendingWorker.CloseWrite() + case StatePeerTerminating: + this.sendingWorker.CloseWrite() case StateTerminated: this.receivingWorker.CloseRead() this.sendingWorker.CloseWrite() @@ -268,6 +275,9 @@ func (this *Connection) Close() error { if state == StatePeerClosed { this.SetState(StateTerminating) } + if state == StatePeerTerminating { + this.SetState(StateTerminated) + } return nil } @@ -405,8 +415,9 @@ func (this *Connection) Input(data []byte) int { if seg.Cmd == SegmentCommandTerminated { state := this.State() if state == StateActive || - state == StateReadyToClose || state == StatePeerClosed { + this.SetState(StatePeerTerminating) + } else if state == StateReadyToClose { this.SetState(StateTerminating) } else if state == StateTerminating { this.SetState(StateTerminated) @@ -450,6 +461,9 @@ func (this *Connection) flush() { } return } + if this.State() == StatePeerTerminating && current-atomic.LoadUint32(&this.stateBeginTime) > 4000 { + this.SetState(StateTerminating) + } if this.State() == StateReadyToClose && current-atomic.LoadUint32(&this.stateBeginTime) > 15000 { this.SetState(StateTerminating) diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go index 2d948e03e..c95cda534 100644 --- a/transport/internet/kcp/listener.go +++ b/transport/internet/kcp/listener.go @@ -59,10 +59,17 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) { if !this.running { return } + if payload.Len() < 4 { + return + } conv := serial.BytesToUint16(payload.Value) + cmd := SegmentCommand(payload.Value[2]) sourceId := src.NetAddr() + "|" + serial.Uint16ToString(conv) conn, found := this.sessions[sourceId] if !found { + if cmd == SegmentCommandTerminated { + return + } log.Debug("KCP|Listener: Creating session with id(", sourceId, ") from ", src) writer := &Writer{ id: sourceId,