diff --git a/app/proxyman/mux/reader.go b/app/proxyman/mux/reader.go index 98b6e4400..f3a2574b6 100644 --- a/app/proxyman/mux/reader.go +++ b/app/proxyman/mux/reader.go @@ -55,7 +55,7 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) { if size <= buf.Size { b = buf.New() } else { - b = buf.NewLocal(int(size)) + b = buf.NewLocal(uint32(size)) } if err := b.AppendSupplier(buf.ReadFullFrom(r.reader, int(size))); err != nil { b.Release() diff --git a/common/buf/buffer.go b/common/buf/buffer.go index ec2556ba1..4128cb325 100644 --- a/common/buf/buffer.go +++ b/common/buf/buffer.go @@ -12,8 +12,7 @@ type Supplier func([]byte) (int, error) // the buffer into an internal buffer pool, in order to recreate a buffer more // quickly. type Buffer struct { - v []byte - pool *Pool + v []byte start int32 end int32 @@ -24,11 +23,8 @@ func (b *Buffer) Release() { if b == nil || b.v == nil { return } - if b.pool != nil { - b.pool.Free(b) - } + FreeBytes(b.v) b.v = nil - b.pool = nil b.start = 0 b.end = 0 } @@ -178,13 +174,48 @@ func (b *Buffer) String() string { // New creates a Buffer with 0 length and 8K capacity. func New() *Buffer { - return mediumPool.Allocate() + return &Buffer{ + v: pool2k.Get().([]byte), + } } // NewLocal creates and returns a buffer with 0 length and given capacity on current thread. -func NewLocal(size int) *Buffer { +func NewLocal(size uint32) *Buffer { return &Buffer{ - v: make([]byte, size), - pool: nil, + v: NewBytes(size), + } +} + +func NewBytes(size uint32) []byte { + if size > 128*1024 { + return make([]byte, size) + } + + if size > 64*1024 { + return pool128k.Get().([]byte) + } + + if size > 8*1024 { + return pool64k.Get().([]byte) + } + + if size > 2*1024 { + return pool8k.Get().([]byte) + } + + return pool2k.Get().([]byte) +} + +func FreeBytes(b []byte) { + size := len(b) + switch { + case size >= 128*1024: + pool128k.Put(b) + case size >= 64*1024: + pool64k.Put(b) + case size >= 8*1024: + pool8k.Put(b) + case size >= 2*1024: + pool2k.Put(b) } } diff --git a/common/buf/buffer_pool.go b/common/buf/buffer_pool.go index 7904ee9a7..92dcd34cd 100644 --- a/common/buf/buffer_pool.go +++ b/common/buf/buffer_pool.go @@ -4,39 +4,29 @@ import ( "sync" ) -// Pool provides functionality to generate and recycle buffers on demand. -type Pool struct { - allocator *sync.Pool -} - -// NewPool creates a SyncPool with given buffer size. -func NewPool(bufferSize uint32) *Pool { - pool := &Pool{ - allocator: &sync.Pool{ - New: func() interface{} { return make([]byte, bufferSize) }, - }, - } - return pool -} - -// Allocate either returns a unused buffer from the pool, or generates a new one from system. -func (p *Pool) Allocate() *Buffer { - return &Buffer{ - v: p.allocator.Get().([]byte), - pool: p, - } -} - -// // Free recycles the given buffer. -func (p *Pool) Free(buffer *Buffer) { - if buffer.v != nil { - p.allocator.Put(buffer.v) - } -} - const ( // Size of a regular buffer. Size = 2 * 1024 ) -var mediumPool = NewPool(Size) +func createAllocFunc(size uint32) func() interface{} { + return func() interface{} { + return make([]byte, size) + } +} + +var pool2k = &sync.Pool{ + New: createAllocFunc(2 * 1024), +} + +var pool8k = &sync.Pool{ + New: createAllocFunc(8 * 1024), +} + +var pool64k = &sync.Pool{ + New: createAllocFunc(64 * 1024), +} + +var pool128k = &sync.Pool{ + New: createAllocFunc(128 * 1024), +} diff --git a/common/buf/buffer_test.go b/common/buf/buffer_test.go index f7146b49f..be0ae42ef 100644 --- a/common/buf/buffer_test.go +++ b/common/buf/buffer_test.go @@ -1,7 +1,6 @@ package buf_test import ( - "crypto/rand" "testing" . "v2ray.com/core/common/buf" @@ -42,32 +41,6 @@ func TestBufferString(t *testing.T) { assert(buffer.String(), Equals, "Test String") } -func TestBufferWrite(t *testing.T) { - assert := With(t) - - buffer := NewLocal(8) - nBytes, err := buffer.Write([]byte("abcd")) - assert(err, IsNil) - assert(nBytes, Equals, 4) - nBytes, err = buffer.Write([]byte("abcde")) - assert(err, IsNil) - assert(nBytes, Equals, 4) - assert(buffer.String(), Equals, "abcdabcd") -} - -func TestSyncPool(t *testing.T) { - assert := With(t) - - p := NewPool(32) - b := p.Allocate() - assert(b.Len(), Equals, 0) - - assert(b.AppendSupplier(ReadFrom(rand.Reader)), IsNil) - assert(b.Len(), Equals, 32) - - b.Release() -} - func BenchmarkNewBuffer(b *testing.B) { for i := 0; i < b.N; i++ { buffer := New() diff --git a/common/buf/reader.go b/common/buf/reader.go index 45dc4ef32..f040a9371 100644 --- a/common/buf/reader.go +++ b/common/buf/reader.go @@ -21,12 +21,13 @@ func NewBytesToBufferReader(reader io.Reader) Reader { const mediumSize = 8 * 1024 const largeSize = 64 * 1024 +const xlSize = 128 * 1024 func (r *BytesToBufferReader) readSmall() (MultiBuffer, error) { b := New() err := b.Reset(ReadFrom(r.Reader)) if b.IsFull() { - r.buffer = make([]byte, mediumSize) + r.buffer = NewBytes(mediumSize) } if !b.IsEmpty() { return NewMultiBufferValue(b), nil @@ -45,11 +46,14 @@ func (r *BytesToBufferReader) ReadMultiBuffer() (MultiBuffer, error) { if nBytes > 0 { mb := NewMultiBufferCap(nBytes/Size + 1) mb.Write(r.buffer[:nBytes]) - if nBytes == len(r.buffer) && len(r.buffer) == mediumSize { - r.buffer = make([]byte, largeSize) + if nBytes == len(r.buffer) && nBytes < xlSize { + FreeBytes(r.buffer) + r.buffer = NewBytes(uint32(nBytes) + 1) } return mb, nil } + FreeBytes(r.buffer) + r.buffer = nil return nil, err } diff --git a/common/crypto/auth.go b/common/crypto/auth.go index 2d591bfbd..e1bf8d0d9 100644 --- a/common/crypto/auth.go +++ b/common/crypto/auth.go @@ -128,7 +128,7 @@ func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) { if size <= buf.Size { b = buf.New() } else { - b = buf.NewLocal(size) + b = buf.NewLocal(uint32(size)) } if err := b.Reset(buf.ReadFullFrom(r.reader, size)); err != nil { b.Release() diff --git a/proxy/shadowsocks/ota.go b/proxy/shadowsocks/ota.go index 3d9396822..bca2eaca7 100644 --- a/proxy/shadowsocks/ota.go +++ b/proxy/shadowsocks/ota.go @@ -81,7 +81,7 @@ func (v *ChunkReader) ReadMultiBuffer() (buf.MultiBuffer, error) { if length > buf.Size { // Theoretically the size of a chunk is 64K, but most Shadowsocks implementations used <4K buffer. buffer.Release() - buffer = buf.NewLocal(int(length) + 128) + buffer = buf.NewLocal(uint32(length) + 128) } buffer.Clear()