From f1231822f721286c7ece0c71244d9813e59e0807 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Mon, 19 Feb 2018 17:50:36 +0100 Subject: [PATCH] fix error handling in ray --- transport/ray/direct.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/transport/ray/direct.go b/transport/ray/direct.go index 53af0f4d5..9be2aff64 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -85,14 +85,14 @@ func (s *Stream) getData() (buf.MultiBuffer, error) { return mb, nil } - if s.close { - return nil, io.EOF - } - if s.err { return nil, io.ErrClosedPipe } + if s.close { + return nil, io.EOF + } + return nil, nil } @@ -121,7 +121,7 @@ func (s *Stream) ReadMultiBuffer() (buf.MultiBuffer, error) { select { case <-s.ctx.Done(): - return nil, io.EOF + return nil, s.ctx.Err() case <-s.writeSignal.Wait(): } } @@ -142,7 +142,7 @@ func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) { select { case <-s.ctx.Done(): - return nil, io.EOF + return nil, s.ctx.Err() case <-time.After(timeout): return nil, buf.ErrReadTimeout case <-s.writeSignal.Wait(): @@ -167,7 +167,7 @@ func (s *Stream) waitForStreamSize() error { for s.Size() >= streamSizeLimit { select { case <-s.ctx.Done(): - return io.ErrClosedPipe + return s.ctx.Err() case <-s.readSignal.Wait(): if s.err || s.close { return io.ErrClosedPipe @@ -227,7 +227,9 @@ func (s *Stream) CloseError() { s.data = nil s.size = 0 } + s.access.Unlock() + s.readSignal.Signal() s.writeSignal.Signal() - s.access.Unlock() + }