diff --git a/transport/ray/direct.go b/transport/ray/direct.go index 134bb44da..d326836d2 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -70,6 +70,10 @@ func (v *Stream) Read() (*buf.Buffer, error) { } func (v *Stream) Write(data *buf.Buffer) (err error) { + if data.IsEmpty() { + return + } + select { case <-v.destClose: return io.ErrClosedPipe diff --git a/transport/ray/direct_test.go b/transport/ray/direct_test.go index e8b4ed197..637f801a4 100644 --- a/transport/ray/direct_test.go +++ b/transport/ray/direct_test.go @@ -13,7 +13,9 @@ func TestStreamIO(t *testing.T) { assert := assert.On(t) stream := NewStream() - assert.Error(stream.Write(buf.New())).IsNil() + b1 := buf.New() + b1.AppendBytes('a') + assert.Error(stream.Write(b1)).IsNil() _, err := stream.Read() assert.Error(err).IsNil() @@ -22,7 +24,9 @@ func TestStreamIO(t *testing.T) { _, err = stream.Read() assert.Error(err).Equals(io.EOF) - err = stream.Write(buf.New()) + b2 := buf.New() + b2.AppendBytes('b') + err = stream.Write(b2) assert.Error(err).Equals(io.ErrClosedPipe) } @@ -30,7 +34,9 @@ func TestStreamClose(t *testing.T) { assert := assert.On(t) stream := NewStream() - assert.Error(stream.Write(buf.New())).IsNil() + b1 := buf.New() + b1.AppendBytes('a') + assert.Error(stream.Write(b1)).IsNil() stream.Close()