diff --git a/app/proxyman/mux/reader.go b/app/proxyman/mux/reader.go index 111bbee2b..d91a31ff1 100644 --- a/app/proxyman/mux/reader.go +++ b/app/proxyman/mux/reader.go @@ -51,7 +51,11 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) { return nil, err } - b := buf.NewSize(int32(size)) + if size > buf.Size { + return nil, newError("packet size too large: ", size) + } + + b := buf.New() if err := b.Reset(buf.ReadFullFrom(r.reader, int32(size))); err != nil { b.Release() return nil, err diff --git a/common/buf/buffer.go b/common/buf/buffer.go index bc1743c68..f716c5564 100644 --- a/common/buf/buffer.go +++ b/common/buf/buffer.go @@ -2,6 +2,13 @@ package buf import ( "io" + + "v2ray.com/core/common/bytespool" +) + +const ( + // Size of a regular buffer. + Size = 2048 ) // Supplier is a writer that writes contents into the given buffer. @@ -11,8 +18,7 @@ type Supplier func([]byte) (int, error) // the buffer into an internal buffer pool, in order to recreate a buffer more // quickly. type Buffer struct { - v []byte - + v []byte start int32 end int32 } @@ -22,10 +28,9 @@ func (b *Buffer) Release() { if b == nil || b.v == nil { return } - freeBytes(b.v) + bytespool.Free(b.v) b.v = nil - b.start = 0 - b.end = 0 + b.Clear() } // Clear clears the content of the buffer, results an empty buffer with @@ -167,13 +172,6 @@ func (b *Buffer) String() string { // New creates a Buffer with 0 length and 2K capacity. func New() *Buffer { return &Buffer{ - v: pool[0].Get().([]byte), - } -} - -// NewSize creates and returns a buffer with 0 length and at least the given capacity. Capacity must be positive. -func NewSize(capacity int32) *Buffer { - return &Buffer{ - v: newBytes(capacity), + v: bytespool.Alloc(Size), } } diff --git a/common/buf/buffer_test.go b/common/buf/buffer_test.go index d753cbdb4..80f8b5304 100644 --- a/common/buf/buffer_test.go +++ b/common/buf/buffer_test.go @@ -47,10 +47,3 @@ func BenchmarkNewBuffer(b *testing.B) { buffer.Release() } } - -func BenchmarkNewLocalBuffer(b *testing.B) { - for i := 0; i < b.N; i++ { - buffer := NewSize(Size) - buffer.Release() - } -} diff --git a/common/buf/multi_buffer.go b/common/buf/multi_buffer.go index 3816d6264..1aec0d464 100644 --- a/common/buf/multi_buffer.go +++ b/common/buf/multi_buffer.go @@ -229,7 +229,7 @@ func (mb *MultiBuffer) SliceBySize(size int32) MultiBuffer { } *mb = (*mb)[endIndex:] if endIndex == 0 && len(*mb) > 0 { - b := NewSize(size) + b := New() common.Must(b.Reset(ReadFullFrom((*mb)[0], size))) return NewMultiBufferValue(b) } diff --git a/common/buf/multi_buffer_test.go b/common/buf/multi_buffer_test.go index a3982704f..84592792f 100644 --- a/common/buf/multi_buffer_test.go +++ b/common/buf/multi_buffer_test.go @@ -40,14 +40,14 @@ func TestMultiBufferAppend(t *testing.T) { func TestMultiBufferSliceBySizeLarge(t *testing.T) { assert := With(t) - lb := NewSize(8 * 1024) - common.Must(lb.Reset(ReadFrom(rand.Reader))) + lb := make([]byte, 8*1024) + common.Must2(io.ReadFull(rand.Reader, lb)) var mb MultiBuffer - mb.Append(lb) + common.Must2(mb.Write(lb)) - mb2 := mb.SliceBySize(4 * 1024) - assert(mb2.Len(), Equals, int32(4*1024)) + mb2 := mb.SliceBySize(1024) + assert(mb2.Len(), Equals, int32(1024)) } func TestInterface(t *testing.T) { diff --git a/common/buf/reader.go b/common/buf/reader.go index fbe263104..02450bc3c 100644 --- a/common/buf/reader.go +++ b/common/buf/reader.go @@ -4,6 +4,7 @@ import ( "io" "v2ray.com/core/common" + "v2ray.com/core/common/bytespool" "v2ray.com/core/common/errors" ) @@ -23,6 +24,8 @@ func readOne(r io.Reader) (*Buffer, error) { return nil, newError("Reader returns too many empty payloads.") } +const largeSize = 128 * 1024 + // BytesToBufferReader is a Reader that adjusts its reading speed automatically. type BytesToBufferReader struct { io.Reader @@ -42,13 +45,13 @@ func (r *BytesToBufferReader) readSmall() (MultiBuffer, error) { return nil, err } if b.IsFull() && largeSize > Size { - r.buffer = newBytes(Size + 1) + r.buffer = bytespool.Alloc(Size + 100) } return NewMultiBufferValue(b), nil } func (r *BytesToBufferReader) freeBuffer() { - freeBytes(r.buffer) + bytespool.Free(r.buffer) r.buffer = nil } @@ -63,8 +66,8 @@ func (r *BytesToBufferReader) ReadMultiBuffer() (MultiBuffer, error) { mb := NewMultiBufferCap(int32(nBytes/Size) + 1) common.Must2(mb.Write(r.buffer[:nBytes])) if nBytes == len(r.buffer) && nBytes < int(largeSize) { - freeBytes(r.buffer) - r.buffer = newBytes(int32(nBytes) + 1) + bytespool.Free(r.buffer) + r.buffer = bytespool.Alloc(int32(nBytes) + 100) } else if nBytes < Size { r.freeBuffer() } diff --git a/common/buf/reader_test.go b/common/buf/reader_test.go index 3a4d8d5d5..c66744514 100644 --- a/common/buf/reader_test.go +++ b/common/buf/reader_test.go @@ -16,7 +16,7 @@ func TestAdaptiveReader(t *testing.T) { reader := NewReader(bytes.NewReader(make([]byte, 1024*1024))) b, err := reader.ReadMultiBuffer() assert(err, IsNil) - assert(b.Len(), Equals, int32(2*1024)) + assert(b.Len(), Equals, int32(Size)) b, err = reader.ReadMultiBuffer() assert(err, IsNil) diff --git a/common/buf/buffer_pool.go b/common/bytespool/pool.go similarity index 74% rename from common/buf/buffer_pool.go rename to common/bytespool/pool.go index 511e36fa7..11668a55f 100644 --- a/common/buf/buffer_pool.go +++ b/common/bytespool/pool.go @@ -1,13 +1,6 @@ -package buf +package bytespool -import ( - "sync" -) - -const ( - // Size of a regular buffer. - Size = 2 * 1024 -) +import "sync" func createAllocFunc(size int32) func() interface{} { return func() interface{} { @@ -25,24 +18,23 @@ const ( ) var ( - pool [numPools]sync.Pool - poolSize [numPools]int32 - largeSize int32 + pool [numPools]sync.Pool + poolSize [numPools]int32 ) func init() { - size := int32(Size) + size := int32(2048) for i := 0; i < numPools; i++ { pool[i] = sync.Pool{ New: createAllocFunc(size), } poolSize[i] = size - largeSize = size size *= sizeMulti } } -func newBytes(size int32) []byte { +// Alloc returns a byte slice with at least the given size. Minimum size of returned slice is 2048. +func Alloc(size int32) []byte { for idx, ps := range poolSize { if size <= ps { return pool[idx].Get().([]byte) @@ -51,7 +43,8 @@ func newBytes(size int32) []byte { return make([]byte, size) } -func freeBytes(b []byte) { +// Free puts a byte slice into the internal pool. +func Free(b []byte) { size := int32(cap(b)) b = b[0:cap(b)] for i := numPools - 1; i >= 0; i-- { diff --git a/common/crypto/auth.go b/common/crypto/auth.go index cc623e52c..bf1c66645 100644 --- a/common/crypto/auth.go +++ b/common/crypto/auth.go @@ -7,6 +7,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" + "v2ray.com/core/common/bytespool" "v2ray.com/core/common/protocol" ) @@ -122,62 +123,85 @@ func (r *AuthenticationReader) readSize() (uint16, uint16, error) { var errSoft = newError("waiting for more data") -func (r *AuthenticationReader) readInternal(soft bool) (*buf.Buffer, error) { +func (r *AuthenticationReader) readBuffer(size int32, padding int32) (*buf.Buffer, error) { + b := buf.New() + if err := b.Reset(buf.ReadFullFrom(r.reader, size)); err != nil { + b.Release() + return nil, err + } + size -= padding + rb, err := r.auth.Open(b.BytesTo(0), b.BytesTo(size)) + if err != nil { + b.Release() + return nil, err + } + b.Resize(0, int32(len(rb))) + return b, nil +} + +func (r *AuthenticationReader) readInternal(soft bool, mb *buf.MultiBuffer) error { if soft && r.reader.BufferedBytes() < r.sizeParser.SizeBytes() { - return nil, errSoft + return errSoft } if r.done { - return nil, io.EOF + return io.EOF } size, padding, err := r.readSize() if err != nil { - return nil, err + return err } if size == uint16(r.auth.Overhead())+padding { r.done = true - return nil, io.EOF + return io.EOF } if soft && int32(size) > r.reader.BufferedBytes() { r.size = size r.paddingLen = padding r.hasSize = true - return nil, errSoft + return errSoft } - b := buf.NewSize(int32(size)) - if err := b.Reset(buf.ReadFullFrom(r.reader, int32(size))); err != nil { - b.Release() - return nil, err + if size <= buf.Size { + b, err := r.readBuffer(int32(size), int32(padding)) + if err != nil { + return nil + } + mb.Append(b) + return nil + } + + payload := bytespool.Alloc(int32(size)) + defer bytespool.Free(payload) + + if _, err := io.ReadFull(r.reader, payload[:size]); err != nil { + return err } size -= padding - rb, err := r.auth.Open(b.BytesTo(0), b.BytesTo(int32(size))) + rb, err := r.auth.Open(payload[:0], payload[:size]) if err != nil { - b.Release() - return nil, err + return err } - b.Resize(0, int32(len(rb))) - return b, nil + common.Must2(mb.Write(rb)) + return nil } func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) { - b, err := r.readInternal(false) - if err != nil { + const readSize = 16 + mb := buf.NewMultiBufferCap(readSize) + if err := r.readInternal(false, &mb); err != nil { + mb.Release() return nil, err } - const readSize = 16 - mb := buf.NewMultiBufferCap(readSize) - mb.Append(b) - for i := 1; i < readSize; i++ { - b, err := r.readInternal(true) + err := r.readInternal(true, &mb) if err == errSoft || err == io.EOF { break } @@ -185,7 +209,6 @@ func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) { mb.Release() return nil, err } - mb.Append(b) } return mb, nil diff --git a/common/crypto/auth_test.go b/common/crypto/auth_test.go index ea16cfcc9..3e4d367a0 100644 --- a/common/crypto/auth_test.go +++ b/common/crypto/auth_test.go @@ -1,6 +1,7 @@ package crypto_test import ( + "bytes" "crypto/aes" "crypto/cipher" "crypto/rand" @@ -29,11 +30,11 @@ func TestAuthenticationReaderWriter(t *testing.T) { rawPayload := make([]byte, payloadSize) rand.Read(rawPayload) - payload := buf.NewSize(payloadSize) + var payload buf.MultiBuffer payload.Write(rawPayload) assert(payload.Len(), Equals, int32(payloadSize)) - cache := buf.NewSize(160 * 1024) + cache := bytes.NewBuffer(nil) iv := make([]byte, 12) rand.Read(iv) @@ -43,8 +44,8 @@ func TestAuthenticationReaderWriter(t *testing.T) { AdditionalDataGenerator: GenerateEmptyBytes(), }, PlainChunkSizeParser{}, cache, protocol.TransferTypeStream, nil) - assert(writer.WriteMultiBuffer(buf.NewMultiBufferValue(payload)), IsNil) - assert(cache.Len(), Equals, int32(82658)) + assert(writer.WriteMultiBuffer(payload), IsNil) + assert(cache.Len(), Equals, int(82658)) assert(writer.WriteMultiBuffer(buf.MultiBuffer{}), IsNil) reader := NewAuthenticationReader(&AEADAuthenticator{ @@ -83,7 +84,7 @@ func TestAuthenticationReaderWriterPacket(t *testing.T) { aead, err := cipher.NewGCM(block) assert(err, IsNil) - cache := buf.NewSize(1024) + cache := buf.New() iv := make([]byte, 12) rand.Read(iv) diff --git a/common/crypto/chunk_test.go b/common/crypto/chunk_test.go index c8df17cab..47402de18 100644 --- a/common/crypto/chunk_test.go +++ b/common/crypto/chunk_test.go @@ -1,6 +1,7 @@ package crypto_test import ( + "bytes" "io" "testing" @@ -13,7 +14,7 @@ import ( func TestChunkStreamIO(t *testing.T) { assert := With(t) - cache := buf.NewSize(8192) + cache := bytes.NewBuffer(make([]byte, 0, 8192)) writer := NewChunkStreamWriter(PlainChunkSizeParser{}, cache) reader := NewChunkStreamReader(PlainChunkSizeParser{}, cache) diff --git a/proxy/shadowsocks/ota.go b/proxy/shadowsocks/ota.go index e3fd834df..c0d101873 100644 --- a/proxy/shadowsocks/ota.go +++ b/proxy/shadowsocks/ota.go @@ -8,6 +8,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" + "v2ray.com/core/common/bytespool" "v2ray.com/core/common/serial" ) @@ -76,24 +77,26 @@ func (v *ChunkReader) ReadMultiBuffer() (buf.MultiBuffer, error) { } size += AuthSize - buffer := buf.NewSize(int32(size)) - if err := buffer.AppendSupplier(buf.ReadFullFrom(v.reader, int32(size))); err != nil { - buffer.Release() + buffer := bytespool.Alloc(int32(size)) + defer bytespool.Free(buffer) + + if _, err := io.ReadFull(v.reader, buffer[:size]); err != nil { return nil, err } - authBytes := buffer.BytesTo(AuthSize) - payload := buffer.BytesFrom(AuthSize) + authBytes := buffer[:AuthSize] + payload := buffer[AuthSize:size] actualAuthBytes := make([]byte, AuthSize) v.auth.Authenticate(payload)(actualAuthBytes) if !bytes.Equal(authBytes, actualAuthBytes) { - buffer.Release() return nil, newError("invalid auth") } - buffer.Advance(AuthSize) - return buf.NewMultiBufferValue(buffer), nil + var mb buf.MultiBuffer + common.Must2(mb.Write(payload)) + + return mb, nil } type ChunkWriter struct { diff --git a/proxy/shadowsocks/ota_test.go b/proxy/shadowsocks/ota_test.go index fc8c44228..3bd11b865 100644 --- a/proxy/shadowsocks/ota_test.go +++ b/proxy/shadowsocks/ota_test.go @@ -24,11 +24,11 @@ func TestNormalChunkReading(t *testing.T) { func TestNormalChunkWriting(t *testing.T) { assert := With(t) - buffer := buf.NewSize(512) + buffer := buf.New() writer := NewChunkWriter(buffer, NewAuthenticator(ChunkKeyGenerator( []byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36}))) - b := buf.NewSize(256) + b := buf.New() b.Write([]byte{11, 12, 13, 14, 15, 16, 17, 18}) err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) assert(err, IsNil) diff --git a/proxy/shadowsocks/protocol_test.go b/proxy/shadowsocks/protocol_test.go index 673444dd8..88ef221ba 100644 --- a/proxy/shadowsocks/protocol_test.go +++ b/proxy/shadowsocks/protocol_test.go @@ -3,6 +3,7 @@ package shadowsocks_test import ( "testing" + "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" @@ -29,7 +30,7 @@ func TestUDPEncoding(t *testing.T) { }, } - data := buf.NewSize(256) + data := buf.New() data.AppendSupplier(serial.WriteString("test string")) encodedData, err := EncodeUDPPacket(request, data.Bytes()) assert(err, IsNil) @@ -104,8 +105,7 @@ func TestTCPRequest(t *testing.T) { runTest := func(request *protocol.RequestHeader, payload []byte) { data := buf.New() - defer data.Release() - data.Write(payload) + common.Must2(data.Write(payload)) cache := buf.New() defer cache.Release() @@ -142,6 +142,8 @@ func TestUDPReaderWriter(t *testing.T) { }), } cache := buf.New() + defer cache.Release() + writer := &buf.SequentialWriter{Writer: &UDPWriter{ Writer: cache, Request: &protocol.RequestHeader{ @@ -158,21 +160,25 @@ func TestUDPReaderWriter(t *testing.T) { User: user, } - b := buf.New() - b.AppendSupplier(serial.WriteString("test payload")) - err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) - assert(err, IsNil) + { + b := buf.New() + b.AppendSupplier(serial.WriteString("test payload")) + err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) + assert(err, IsNil) - payload, err := reader.ReadMultiBuffer() - assert(err, IsNil) - assert(payload[0].String(), Equals, "test payload") + payload, err := reader.ReadMultiBuffer() + assert(err, IsNil) + assert(payload[0].String(), Equals, "test payload") + } - b = buf.New() - b.AppendSupplier(serial.WriteString("test payload 2")) - err = writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) - assert(err, IsNil) + { + b := buf.New() + b.AppendSupplier(serial.WriteString("test payload 2")) + err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) + assert(err, IsNil) - payload, err = reader.ReadMultiBuffer() - assert(err, IsNil) - assert(payload[0].String(), Equals, "test payload 2") + payload, err := reader.ReadMultiBuffer() + assert(err, IsNil) + assert(payload[0].String(), Equals, "test payload 2") + } } diff --git a/testing/scenarios/tls_test.go b/testing/scenarios/tls_test.go index e269201e6..b36a4f98d 100644 --- a/testing/scenarios/tls_test.go +++ b/testing/scenarios/tls_test.go @@ -10,6 +10,7 @@ import ( "v2ray.com/core" "v2ray.com/core/app/proxyman" + "v2ray.com/core/common" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol/tls/cert" @@ -36,7 +37,7 @@ func TestSimpleTLSConnection(t *testing.T) { MsgProcessor: xor, } dest, err := tcpServer.Start() - assert(err, IsNil) + common.Must(err) defer tcpServer.Close() userID := protocol.NewID(uuid.New()) @@ -123,24 +124,25 @@ func TestSimpleTLSConnection(t *testing.T) { } servers, err := InitializeServerConfigs(serverConfig, clientConfig) - assert(err, IsNil) + common.Must(err) + defer CloseAllServers(servers) - conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: int(clientPort), - }) - assert(err, IsNil) + { + conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: int(clientPort), + }) + assert(err, IsNil) - payload := "dokodemo request." - nBytes, err := conn.Write([]byte(payload)) - assert(err, IsNil) - assert(nBytes, Equals, len(payload)) + payload := "dokodemo request." + nBytes, err := conn.Write([]byte(payload)) + assert(err, IsNil) + assert(nBytes, Equals, len(payload)) - response := readFrom(conn, time.Second*2, len(payload)) - assert(response, Equals, xor([]byte(payload))) - assert(conn.Close(), IsNil) - - CloseAllServers(servers) + response := readFrom(conn, time.Second*2, len(payload)) + assert(response, Equals, xor([]byte(payload))) + assert(conn.Close(), IsNil) + } } func TestAutoIssuingCertificate(t *testing.T) { diff --git a/transport/internet/headers/http/http_test.go b/transport/internet/headers/http/http_test.go index ff8b9c540..e23ca12c1 100644 --- a/transport/internet/headers/http/http_test.go +++ b/transport/internet/headers/http/http_test.go @@ -16,7 +16,7 @@ func TestReaderWriter(t *testing.T) { assert := With(t) cache := buf.New() - b := buf.NewSize(256) + b := buf.New() b.AppendSupplier(serial.WriteString("abcd" + ENDING)) writer := NewHeaderWriter(b) err := writer.Write(cache) diff --git a/transport/internet/kcp/io.go b/transport/internet/kcp/io.go index 7b7b0011f..6249a7e6c 100644 --- a/transport/internet/kcp/io.go +++ b/transport/internet/kcp/io.go @@ -70,7 +70,7 @@ func (w *KCPPacketWriter) Overhead() int { } func (w *KCPPacketWriter) Write(b []byte) (int, error) { - bb := buf.NewSize(int32(len(b) + w.Overhead())) + bb := buf.New() defer bb.Release() if w.Header != nil {