diff --git a/transport/ray/direct.go b/transport/ray/direct.go index f9406f8e3..e26348c24 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -143,15 +143,20 @@ func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) { } } +// Size returns the number of bytes hold in the Stream. +func (s *Stream) Size() uint64 { + s.access.RLock() + defer s.access.RUnlock() + + return s.size +} + func (s *Stream) waitForStreamSize() error { if streamSizeLimit == 0 { return nil } - s.access.RLock() - defer s.access.RUnlock() - - for streamSizeLimit > 0 && s.size >= streamSizeLimit { + for s.Size() >= streamSizeLimit { select { case <-s.ctx.Done(): return io.ErrClosedPipe