From 5ae8bfbda16461d565acf9ad8eba0708936f8f64 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Fri, 20 Oct 2017 23:23:29 +0200 Subject: [PATCH] fix data race in ray --- transport/ray/direct.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/transport/ray/direct.go b/transport/ray/direct.go index 3fcea2c84..f9406f8e3 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -143,26 +143,38 @@ func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) { } } -func (s *Stream) Write(data buf.MultiBuffer) error { - if data.IsEmpty() { +func (s *Stream) waitForStreamSize() error { + if streamSizeLimit == 0 { return nil } + s.access.RLock() + defer s.access.RUnlock() + for streamSizeLimit > 0 && s.size >= streamSizeLimit { select { case <-s.ctx.Done(): return io.ErrClosedPipe case <-s.readSignal: - s.access.RLock() if s.err || s.close { - data.Release() - s.access.RUnlock() return io.ErrClosedPipe } - s.access.RUnlock() } } + return nil +} + +func (s *Stream) Write(data buf.MultiBuffer) error { + if data.IsEmpty() { + return nil + } + + if err := s.waitForStreamSize(); err != nil { + data.Release() + return err + } + s.access.Lock() defer s.access.Unlock()