diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index 59ea06e83..209a282af 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -98,13 +98,14 @@ func (this *RountTripInfo) SmoothedTime() uint32 { // Connection is a KCP connection over UDP. type Connection struct { - block Authenticator - local, remote net.Addr - rd time.Time - wd time.Time // write deadline - writer io.WriteCloser - since int64 - dataInputCond *sync.Cond + block Authenticator + local, remote net.Addr + rd time.Time + wd time.Time // write deadline + writer io.WriteCloser + since int64 + dataInputCond *sync.Cond + dataOutputCond *sync.Cond conv uint16 state State @@ -135,6 +136,7 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, conn.writer = writerCloser conn.since = nowMillisec() conn.dataInputCond = sync.NewCond(new(sync.Mutex)) + conn.dataOutputCond = sync.NewCond(new(sync.Mutex)) authWriter := &AuthenticationWriter{ Authenticator: block, @@ -222,12 +224,25 @@ func (this *Connection) Write(b []byte) (int, error) { } } + var timer *time.Timer + if !this.wd.IsZero() { + duration := this.wd.Sub(time.Now()) + if duration <= 0 { + return totalWritten, errTimeout + } + timer = time.AfterFunc(duration, this.dataOutputCond.Signal) + } + this.dataOutputCond.L.Lock() + this.dataOutputCond.Wait() + this.dataOutputCond.L.Unlock() + + if timer != nil { + timer.Stop() + } + if !this.wd.IsZero() && this.wd.Before(time.Now()) { return totalWritten, errTimeout } - - // Sending windows is 1024 for the moment. This amount is not gonna sent in 1 sec. - time.Sleep(time.Second) } } @@ -260,6 +275,7 @@ func (this *Connection) Close() error { } this.dataInputCond.Broadcast() + this.dataOutputCond.Broadcast() state := this.State() if state == StateReadyToClose || @@ -410,6 +426,7 @@ func (this *Connection) Input(data []byte) int { case *AckSegment: this.HandleOption(seg.Opt) this.sendingWorker.ProcessSegment(current, seg) + this.dataOutputCond.Signal() case *CmdOnlySegment: this.HandleOption(seg.Opt) if seg.Cmd == SegmentCommandTerminated {