diff --git a/transport/pipe/impl.go b/transport/pipe/impl.go index d5e929c94..c940e8abd 100644 --- a/transport/pipe/impl.go +++ b/transport/pipe/impl.go @@ -41,6 +41,7 @@ type pipe struct { } var errBufferFull = errors.New("buffer full") +var errSlowDown = errors.New("slow down") func (p *pipe) getState(forRead bool) error { switch p.state { @@ -122,11 +123,11 @@ func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error { if p.data == nil { p.data = mb - } else { - p.data, _ = buf.MergeMulti(p.data, mb) + return nil } - return nil + p.data, _ = buf.MergeMulti(p.data, mb) + return errSlowDown } func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { @@ -136,17 +137,25 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { for { err := p.writeMultiBufferInternal(mb) - switch { - case err == nil: + if err == nil { + p.readSignal.Signal() + return nil + } + + if err == errSlowDown { p.readSignal.Signal() // Yield current goroutine. Hopefully the reading counterpart can pick up the payload. runtime.Gosched() return nil - case err == errBufferFull && p.option.discardOverflow: + } + + if err == errBufferFull && p.option.discardOverflow { buf.ReleaseMulti(mb) return nil - case err != errBufferFull: + } + + if err != errBufferFull { buf.ReleaseMulti(mb) p.readSignal.Signal() return err diff --git a/transport/pipe/pipe_test.go b/transport/pipe/pipe_test.go index 06521d37e..f49c476af 100644 --- a/transport/pipe/pipe_test.go +++ b/transport/pipe/pipe_test.go @@ -17,14 +17,18 @@ func TestPipeReadWrite(t *testing.T) { assert := With(t) pReader, pWriter := New(WithSizeLimit(1024)) - payload := []byte{'a', 'b', 'c', 'd'} + b := buf.New() - b.Write(payload) + b.WriteString("abcd") assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil) + b2 := buf.New() + b2.WriteString("efg") + assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b2}), IsNil) + rb, err := pReader.ReadMultiBuffer() assert(err, IsNil) - assert(rb.String(), Equals, b.String()) + assert(rb.String(), Equals, "abcdefg") } func TestPipeCloseError(t *testing.T) {