From 66b82e4ab7cd4a79c2e603352728f70936cfdfcc Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 9 Nov 2017 00:55:28 +0100 Subject: [PATCH] cleanup MultiBuffer --- app/proxyman/mux/mux_test.go | 2 +- app/proxyman/mux/reader.go | 2 +- app/proxyman/mux/writer.go | 2 +- common/buf/multi_buffer.go | 12 ++++++------ common/buf/reader.go | 2 +- common/buf/writer.go | 2 +- common/crypto/auth.go | 2 +- common/crypto/auth_test.go | 8 ++++---- common/crypto/chunk.go | 4 +++- common/crypto/chunk_test.go | 2 +- proxy/vmess/inbound/inbound.go | 2 +- proxy/vmess/outbound/outbound.go | 2 +- transport/internet/kcp/receiving.go | 2 +- transport/ray/direct.go | 6 +++--- 14 files changed, 26 insertions(+), 24 deletions(-) diff --git a/app/proxyman/mux/mux_test.go b/app/proxyman/mux/mux_test.go index 70cebf69c..eacdd7d2d 100644 --- a/app/proxyman/mux/mux_test.go +++ b/app/proxyman/mux/mux_test.go @@ -14,7 +14,7 @@ import ( ) func readAll(reader buf.Reader) (buf.MultiBuffer, error) { - mb := buf.NewMultiBuffer() + var mb buf.MultiBuffer for { b, err := reader.Read() if err == io.EOF { diff --git a/app/proxyman/mux/reader.go b/app/proxyman/mux/reader.go index 298eec058..acdfe75b0 100644 --- a/app/proxyman/mux/reader.go +++ b/app/proxyman/mux/reader.go @@ -94,7 +94,7 @@ func (r *StreamReader) Read() (buf.MultiBuffer, error) { r.leftOver = int(size) } - mb := buf.NewMultiBuffer() + mb := buf.NewMultiBufferCap(32) for r.leftOver > 0 { readLen := buf.Size if r.leftOver < readLen { diff --git a/app/proxyman/mux/writer.go b/app/proxyman/mux/writer.go index cee325e81..39a3158c3 100644 --- a/app/proxyman/mux/writer.go +++ b/app/proxyman/mux/writer.go @@ -71,7 +71,7 @@ func (w *Writer) writeData(mb buf.MultiBuffer) error { return err } - mb2 := buf.NewMultiBuffer() + mb2 := buf.NewMultiBufferCap(len(mb) + 1) mb2.Append(frame) mb2.AppendMulti(mb) return w.writer.Write(mb2) diff --git a/common/buf/multi_buffer.go b/common/buf/multi_buffer.go index 5a684fa4e..724466307 100644 --- a/common/buf/multi_buffer.go +++ b/common/buf/multi_buffer.go @@ -20,7 +20,7 @@ type MultiBufferReader interface { // ReadAllToMultiBuffer reads all content from the reader into a MultiBuffer, until EOF. func ReadAllToMultiBuffer(reader io.Reader) (MultiBuffer, error) { - mb := NewMultiBuffer() + mb := NewMultiBufferCap(128) for { b := New() @@ -55,9 +55,9 @@ func ReadAllToBytes(reader io.Reader) ([]byte, error) { // MultiBuffer is a list of Buffers. The order of Buffer matters. type MultiBuffer []*Buffer -// NewMultiBuffer creates a new MultiBuffer instance. -func NewMultiBuffer() MultiBuffer { - return MultiBuffer(make([]*Buffer, 0, 128)) +// NewMultiBufferCap creates a new MultiBuffer instance. +func NewMultiBufferCap(capacity int) MultiBuffer { + return MultiBuffer(make([]*Buffer, 0, capacity)) } // NewMultiBufferValue wraps a list of Buffers into MultiBuffer. @@ -149,7 +149,7 @@ func (mb *MultiBuffer) Release() { b.Release() (*mb)[i] = nil } - *mb = (*mb)[:0] + *mb = nil } // ToNetBuffers converts this MultiBuffer to net.Buffers. The return net.Buffers points to the same content of the MultiBuffer. @@ -163,7 +163,7 @@ func (mb MultiBuffer) ToNetBuffers() net.Buffers { // SliceBySize splits the begining of this MultiBuffer into another one, for at most size bytes. func (mb *MultiBuffer) SliceBySize(size int) MultiBuffer { - slice := NewMultiBuffer() + slice := NewMultiBufferCap(10) sliceSize := 0 endIndex := len(*mb) for i, b := range *mb { diff --git a/common/buf/reader.go b/common/buf/reader.go index 8715075a7..6d52ea0a8 100644 --- a/common/buf/reader.go +++ b/common/buf/reader.go @@ -41,7 +41,7 @@ func (r *BytesToBufferReader) Read() (MultiBuffer, error) { return nil, err } - mb := NewMultiBuffer() + mb := NewMultiBufferCap(nBytes/Size + 1) mb.Write(r.buffer[:nBytes]) return mb, nil } diff --git a/common/buf/writer.go b/common/buf/writer.go index 36fa0edc9..32122a153 100644 --- a/common/buf/writer.go +++ b/common/buf/writer.go @@ -75,7 +75,7 @@ type bytesToBufferWriter struct { // Write implements io.Writer. func (w *bytesToBufferWriter) Write(payload []byte) (int, error) { - mb := NewMultiBuffer() + mb := NewMultiBufferCap(len(payload)/Size + 1) mb.Write(payload) if err := w.writer.Write(mb); err != nil { return 0, err diff --git a/common/crypto/auth.go b/common/crypto/auth.go index 4c619cdbf..289e14f29 100644 --- a/common/crypto/auth.go +++ b/common/crypto/auth.go @@ -157,7 +157,7 @@ func (r *AuthenticationReader) Read() (buf.MultiBuffer, error) { return nil, err } - mb := buf.NewMultiBuffer() + var mb buf.MultiBuffer if r.transferType == protocol.TransferTypeStream { mb.Write(b) } else { diff --git a/common/crypto/auth_test.go b/common/crypto/auth_test.go index cd10b1b63..602fa34ec 100644 --- a/common/crypto/auth_test.go +++ b/common/crypto/auth_test.go @@ -44,7 +44,7 @@ func TestAuthenticationReaderWriter(t *testing.T) { assert(writer.Write(buf.NewMultiBufferValue(payload)), IsNil) assert(cache.Len(), Equals, 83360) - assert(writer.Write(buf.NewMultiBuffer()), IsNil) + assert(writer.Write(buf.MultiBuffer{}), IsNil) assert(err, IsNil) reader := NewAuthenticationReader(&AEADAuthenticator{ @@ -55,7 +55,7 @@ func TestAuthenticationReaderWriter(t *testing.T) { AdditionalDataGenerator: &NoOpBytesGenerator{}, }, PlainChunkSizeParser{}, cache, protocol.TransferTypeStream) - mb := buf.NewMultiBuffer() + var mb buf.MultiBuffer for mb.Len() < len(rawPayload) { mb2, err := reader.Read() @@ -95,7 +95,7 @@ func TestAuthenticationReaderWriterPacket(t *testing.T) { AdditionalDataGenerator: &NoOpBytesGenerator{}, }, PlainChunkSizeParser{}, cache, protocol.TransferTypePacket) - payload := buf.NewMultiBuffer() + var payload buf.MultiBuffer pb1 := buf.New() pb1.Append([]byte("abcd")) payload.Append(pb1) @@ -106,7 +106,7 @@ func TestAuthenticationReaderWriterPacket(t *testing.T) { assert(writer.Write(payload), IsNil) assert(cache.Len(), GreaterThan, 0) - assert(writer.Write(buf.NewMultiBuffer()), IsNil) + assert(writer.Write(buf.MultiBuffer{}), IsNil) assert(err, IsNil) reader := NewAuthenticationReader(&AEADAuthenticator{ diff --git a/common/crypto/chunk.go b/common/crypto/chunk.go index f284ef19d..af43e346c 100644 --- a/common/crypto/chunk.go +++ b/common/crypto/chunk.go @@ -48,6 +48,7 @@ func NewChunkStreamReader(sizeDecoder ChunkSizeDecoder, reader io.Reader) *Chunk sizeDecoder: sizeDecoder, reader: buf.NewReader(reader), buffer: make([]byte, sizeDecoder.SizeBytes()), + leftOver: buf.NewMultiBufferCap(16), } } @@ -129,8 +130,9 @@ func NewChunkStreamWriter(sizeEncoder ChunkSizeEncoder, writer io.Writer) *Chunk } func (w *ChunkStreamWriter) Write(mb buf.MultiBuffer) error { - mb2Write := buf.NewMultiBuffer() const sliceSize = 8192 + mbLen := mb.Len() + mb2Write := buf.NewMultiBufferCap(mbLen / buf.Size + mbLen / sliceSize + 2) for { slice := mb.SliceBySize(sliceSize) diff --git a/common/crypto/chunk_test.go b/common/crypto/chunk_test.go index 67144dcef..2c9763cb6 100644 --- a/common/crypto/chunk_test.go +++ b/common/crypto/chunk_test.go @@ -25,7 +25,7 @@ func TestChunkStreamIO(t *testing.T) { b.AppendBytes('e', 'f', 'g') assert(writer.Write(buf.NewMultiBufferValue(b)), IsNil) - assert(writer.Write(buf.NewMultiBuffer()), IsNil) + assert(writer.Write(buf.MultiBuffer{}), IsNil) assert(cache.Len(), Equals, 13) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 877e51ea2..faff13407 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -163,7 +163,7 @@ func transferResponse(timer signal.ActivityUpdater, session *encoding.ServerSess } if request.Option.Has(protocol.RequestOptionChunkStream) { - if err := bodyWriter.Write(buf.NewMultiBuffer()); err != nil { + if err := bodyWriter.Write(buf.MultiBuffer{}); err != nil { return err } } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 3e395254d..a883d1fb5 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -132,7 +132,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } if request.Option.Has(protocol.RequestOptionChunkStream) { - if err := bodyWriter.Write(buf.NewMultiBuffer()); err != nil { + if err := bodyWriter.Write(buf.MultiBuffer{}); err != nil { return err } } diff --git a/transport/internet/kcp/receiving.go b/transport/internet/kcp/receiving.go index 5024628be..8c281bb93 100644 --- a/transport/internet/kcp/receiving.go +++ b/transport/internet/kcp/receiving.go @@ -203,7 +203,7 @@ func (v *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer { return mb } - mb := buf.NewMultiBuffer() + mb := buf.NewMultiBufferCap(32) v.Lock() defer v.Unlock() diff --git a/transport/ray/direct.go b/transport/ray/direct.go index 34a01fc8f..2378e394b 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -197,10 +197,10 @@ func (s *Stream) Write(data buf.MultiBuffer) error { } if s.data == nil { - s.data = data - } else { - s.data.AppendMulti(data) + s.data = buf.NewMultiBufferCap(128) } + + s.data.AppendMulti(data) s.size += uint64(data.Len()) s.notifyWrite()