diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index c62d35ee3..4d2db151e 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -69,7 +69,6 @@ type UDPSession struct { rd time.Time // read deadline wd time.Time // write deadline chReadEvent chan struct{} - chWriteEvent chan struct{} writer io.WriteCloser since int64 } @@ -79,7 +78,6 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr, sess := new(UDPSession) sess.local = local sess.chReadEvent = make(chan struct{}, 1) - sess.chWriteEvent = make(chan struct{}, 1) sess.remote = remote sess.block = block sess.writer = writerCloser @@ -95,7 +93,10 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr, opt = OptionClose } ext.Prepend([]byte{byte(cmd), byte(opt)}) - sess.output(ext) + go sess.output(ext) + } + if sess.state == ConnStateReadyToClose && sess.kcp.WaitSnd() == 0 { + go sess.NotifyTermination() } }) sess.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd) @@ -130,24 +131,17 @@ func (s *UDPSession) Read(b []byte) (int, error) { return 0, errTimeout } } + s.Unlock() + s.kcpAccess.Lock() nBytes := s.kcp.Recv(b) + s.kcpAccess.Unlock() if nBytes > 0 { - s.Unlock() return nBytes, nil } - - var timeout <-chan time.Time - if !s.rd.IsZero() { - delay := s.rd.Sub(time.Now()) - timeout = time.After(delay) - } - - s.Unlock() select { case <-s.chReadEvent: - case <-timeout: - return 0, errTimeout + case <-time.After(time.Second): } } } @@ -182,7 +176,8 @@ func (s *UDPSession) Write(b []byte) (int, error) { return 0, errTimeout } - time.Sleep(time.Duration(s.kcp.WaitSnd()*5) * time.Millisecond) + // Sending windows is 1024 for the moment. This amount is not gonna sent in 1 sec. + time.Sleep(time.Second) } } @@ -207,11 +202,13 @@ func (this *UDPSession) NotifyTermination() { this.Unlock() break } + this.Unlock() buffer := alloc.NewSmallBuffer().Clear() buffer.AppendBytes(byte(CommandTerminate), byte(OptionClose), byte(0), byte(0), byte(0), byte(0)) this.output(buffer) + time.Sleep(time.Second) - this.Unlock() + } this.Terminate() } @@ -284,9 +281,15 @@ func (s *UDPSession) output(payload *alloc.Buffer) { func (s *UDPSession) updateTask() { for s.state != ConnStateClosed { current := s.Elapsed() + s.kcpAccess.Lock() s.kcp.Update(current) interval := s.kcp.Check(s.Elapsed()) - time.Sleep(time.Duration(interval) * time.Millisecond) + s.kcpAccess.Unlock() + sleep := interval - current + if sleep < 10 { + sleep = 10 + } + time.Sleep(time.Duration(sleep) * time.Millisecond) } } @@ -297,13 +300,6 @@ func (s *UDPSession) notifyReadEvent() { } } -func (s *UDPSession) notifyWriteEvent() { - select { - case s.chWriteEvent <- struct{}{}: - default: - } -} - func (this *UDPSession) MarkPeerClose() { this.Lock() defer this.Unlock()