From 746580d56613e279ff54a16c358795d857cd8faa Mon Sep 17 00:00:00 2001 From: V2Ray Date: Thu, 8 Oct 2015 23:06:12 +0200 Subject: [PATCH] Use buffer in socks proxy --- common/alloc/buffer.go | 35 ++++++++-- proxy/socks/protocol/socks.go | 105 ++++++++++++++-------------- proxy/socks/protocol/socks4.go | 14 ++-- proxy/socks/protocol/socks4_test.go | 9 ++- proxy/socks/protocol/socks_test.go | 10 ++- proxy/socks/protocol/udp.go | 4 +- proxy/socks/socks.go | 22 ++++-- proxy/vmess/vmessin.go | 2 +- 8 files changed, 119 insertions(+), 82 deletions(-) diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index 8d241fc35..e950a448b 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -17,26 +17,39 @@ func (b *Buffer) Release() { b.pool = nil } -func (b *Buffer) Clear() { +func (b *Buffer) Clear() *Buffer { b.Value = b.head[:0] + return b } -func (b *Buffer) Append(data []byte) { +func (b *Buffer) AppendBytes(bytes ...byte) *Buffer { + b.Value = append(b.Value, bytes...) + return b +} + +func (b *Buffer) Append(data []byte) *Buffer { b.Value = append(b.Value, data...) + return b } -func (b *Buffer) Slice(from, to int) { +func (b *Buffer) Slice(from, to int) *Buffer { b.Value = b.Value[from:to] + return b } -func (b *Buffer) SliceFrom(from int) { +func (b *Buffer) SliceFrom(from int) *Buffer { b.Value = b.Value[from:] + return b } func (b *Buffer) Len() int { return len(b.Value) } +func (b *Buffer) IsFull() bool { + return len(b.Value) == cap(b.Value) +} + type bufferPool struct { chain chan []byte bufferSize int @@ -90,8 +103,18 @@ func (p *bufferPool) cleanup(tick <-chan time.Time) { } } -var smallPool = newBufferPool(8*1024, 256, 2048) +var smallPool = newBufferPool(1024, 16, 64) +var mediumPool = newBufferPool(8*1024, 256, 2048) +var largePool = newBufferPool(64*1024, 16, 64) -func NewBuffer() *Buffer { +func NewSmallBuffer() *Buffer { return smallPool.allocate() } + +func NewBuffer() *Buffer { + return mediumPool.allocate() +} + +func NewLargeBuffer() *Buffer { + return largePool.allocate() +} diff --git a/proxy/socks/protocol/socks.go b/proxy/socks/protocol/socks.go index 61e027a3f..c39e4b3e8 100644 --- a/proxy/socks/protocol/socks.go +++ b/proxy/socks/protocol/socks.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "io" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/errors" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -39,8 +40,10 @@ func (request *Socks5AuthenticationRequest) HasAuthMethod(method byte) bool { } func ReadAuthentication(reader io.Reader) (auth Socks5AuthenticationRequest, auth4 Socks4AuthenticationRequest, err error) { - buffer := make([]byte, 256) - nBytes, err := reader.Read(buffer) + buffer := alloc.NewSmallBuffer() + defer buffer.Release() + + nBytes, err := reader.Read(buffer.Value) if err != nil { return } @@ -50,22 +53,22 @@ func ReadAuthentication(reader io.Reader) (auth Socks5AuthenticationRequest, aut return } - if buffer[0] == socks4Version { - auth4.Version = buffer[0] - auth4.Command = buffer[1] - auth4.Port = binary.BigEndian.Uint16(buffer[2:4]) - copy(auth4.IP[:], buffer[4:8]) + if buffer.Value[0] == socks4Version { + auth4.Version = buffer.Value[0] + auth4.Command = buffer.Value[1] + auth4.Port = binary.BigEndian.Uint16(buffer.Value[2:4]) + copy(auth4.IP[:], buffer.Value[4:8]) err = NewSocksVersion4Error() return } - auth.version = buffer[0] + auth.version = buffer.Value[0] if auth.version != socksVersion { err = errors.NewProtocolVersionError(int(auth.version)) return } - auth.nMethods = buffer[1] + auth.nMethods = buffer.Value[1] if auth.nMethods <= 0 { log.Info("Zero length of authentication methods") err = errors.NewCorruptedPacketError() @@ -77,7 +80,7 @@ func ReadAuthentication(reader io.Reader) (auth Socks5AuthenticationRequest, aut err = errors.NewCorruptedPacketError() return } - copy(auth.authMethods[:], buffer[2:nBytes]) + copy(auth.authMethods[:], buffer.Value[2:nBytes]) return } @@ -113,29 +116,31 @@ func (request Socks5UserPassRequest) AuthDetail() string { } func ReadUserPassRequest(reader io.Reader) (request Socks5UserPassRequest, err error) { - buffer := make([]byte, 256) - _, err = reader.Read(buffer[0:2]) - if err != nil { - return - } - request.version = buffer[0] - nUsername := buffer[1] - nBytes, err := reader.Read(buffer[:nUsername]) - if err != nil { - return - } - request.username = string(buffer[:nBytes]) + buffer := alloc.NewSmallBuffer() + defer buffer.Release() - _, err = reader.Read(buffer[0:1]) + _, err = reader.Read(buffer.Value[0:2]) if err != nil { return } - nPassword := buffer[0] - nBytes, err = reader.Read(buffer[:nPassword]) + request.version = buffer.Value[0] + nUsername := buffer.Value[1] + nBytes, err := reader.Read(buffer.Value[:nUsername]) if err != nil { return } - request.password = string(buffer[:nBytes]) + request.username = string(buffer.Value[:nBytes]) + + _, err = reader.Read(buffer.Value[0:1]) + if err != nil { + return + } + nPassword := buffer.Value[0] + nBytes, err = reader.Read(buffer.Value[:nPassword]) + if err != nil { + return + } + request.password = string(buffer.Value[:nBytes]) return } @@ -177,8 +182,10 @@ type Socks5Request struct { } func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { - buffer := make([]byte, 256) - nBytes, err := reader.Read(buffer[:4]) + buffer := alloc.NewSmallBuffer() + defer buffer.Release() + + nBytes, err := reader.Read(buffer.Value[:4]) if err != nil { return } @@ -187,10 +194,10 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { return } request = &Socks5Request{ - Version: buffer[0], - Command: buffer[1], + Version: buffer.Value[0], + Command: buffer.Value[1], // buffer[2] is a reserved field - AddrType: buffer[3], + AddrType: buffer.Value[3], } switch request.AddrType { case AddrTypeIPv4: @@ -203,12 +210,12 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { return } case AddrTypeDomain: - nBytes, err = reader.Read(buffer[0:1]) + nBytes, err = reader.Read(buffer.Value[0:1]) if err != nil { return } - domainLength := buffer[0] - nBytes, err = reader.Read(buffer[:domainLength]) + domainLength := buffer.Value[0] + nBytes, err = reader.Read(buffer.Value[:domainLength]) if err != nil { return } @@ -218,7 +225,7 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { err = errors.NewCorruptedPacketError() return } - request.Domain = string(buffer[:domainLength]) + request.Domain = string(buffer.Value[:domainLength]) case AddrTypeIPv6: nBytes, err = reader.Read(request.IPv6[:]) if err != nil { @@ -234,7 +241,7 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { return } - nBytes, err = reader.Read(buffer[:2]) + nBytes, err = reader.Read(buffer.Value[:2]) if err != nil { return } @@ -243,7 +250,7 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { return } - request.Port = binary.BigEndian.Uint16(buffer) + request.Port = binary.BigEndian.Uint16(buffer.Value[:2]) return } @@ -305,26 +312,16 @@ func (r *Socks5Response) SetDomain(domain string) { r.Domain = domain } -func (r *Socks5Response) toBytes() []byte { - buffer := make([]byte, 0, 300) - buffer = append(buffer, r.Version) - buffer = append(buffer, r.Error) - buffer = append(buffer, 0x00) // reserved - buffer = append(buffer, r.AddrType) +func (r *Socks5Response) Write(buffer *alloc.Buffer) { + buffer.AppendBytes(r.Version, r.Error, 0x00 /* reserved */, r.AddrType) switch r.AddrType { case 0x01: - buffer = append(buffer, r.IPv4[:]...) + buffer.Append(r.IPv4[:]) case 0x03: - buffer = append(buffer, byte(len(r.Domain))) - buffer = append(buffer, []byte(r.Domain)...) + buffer.AppendBytes(byte(len(r.Domain))) + buffer.Append([]byte(r.Domain)) case 0x04: - buffer = append(buffer, r.IPv6[:]...) + buffer.Append(r.IPv6[:]) } - buffer = append(buffer, byte(r.Port>>8), byte(r.Port)) - return buffer -} - -func WriteResponse(writer io.Writer, response *Socks5Response) error { - _, err := writer.Write(response.toBytes()) - return err + buffer.AppendBytes(byte(r.Port>>8), byte(r.Port)) } diff --git a/proxy/socks/protocol/socks4.go b/proxy/socks/protocol/socks4.go index 4e14b69f3..0b5fa841d 100644 --- a/proxy/socks/protocol/socks4.go +++ b/proxy/socks/protocol/socks4.go @@ -1,6 +1,7 @@ package protocol import ( + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/errors" ) @@ -39,13 +40,8 @@ func NewSocks4AuthenticationResponse(result byte, port uint16, ip []byte) *Socks } } -func (r *Socks4AuthenticationResponse) ToBytes(buffer []byte) []byte { - if buffer == nil { - buffer = make([]byte, 8) - } - buffer[1] = r.result - buffer[2] = byte(r.port >> 8) - buffer[3] = byte(r.port) - copy(buffer[4:], r.ip) - return buffer +func (r *Socks4AuthenticationResponse) Write(buffer *alloc.Buffer) { + buffer.AppendBytes( + byte(0x00), r.result, byte(r.port>>8), byte(r.port), + r.ip[0], r.ip[1], r.ip[2], r.ip[3]) } diff --git a/proxy/socks/protocol/socks4_test.go b/proxy/socks/protocol/socks4_test.go index 0b1e16b0c..eb45aecba 100644 --- a/proxy/socks/protocol/socks4_test.go +++ b/proxy/socks/protocol/socks4_test.go @@ -4,6 +4,7 @@ import ( "bytes" "testing" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/testing/unit" ) @@ -32,6 +33,10 @@ func TestSocks4AuthenticationResponseToBytes(t *testing.T) { port: 443, ip: []byte{1, 2, 3, 4}, } - responseBytes := response.ToBytes(nil) - assert.Bytes(responseBytes).Equals([]byte{0x00, 0x10, 0x01, 0xBB, 0x01, 0x02, 0x03, 0x04}) + + buffer := alloc.NewSmallBuffer().Clear() + defer buffer.Release() + + response.Write(buffer) + assert.Bytes(buffer.Value).Equals([]byte{0x00, 0x10, 0x01, 0xBB, 0x01, 0x02, 0x03, 0x04}) } diff --git a/proxy/socks/protocol/socks_test.go b/proxy/socks/protocol/socks_test.go index 7310f362f..4bce563e6 100644 --- a/proxy/socks/protocol/socks_test.go +++ b/proxy/socks/protocol/socks_test.go @@ -4,6 +4,7 @@ import ( "bytes" "testing" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/testing/unit" ) @@ -67,7 +68,7 @@ func TestRequestRead(t *testing.T) { assert.Uint16(request.Port).Named("Port").Equals(53) } -func TestResponseToBytes(t *testing.T) { +func TestResponseWrite(t *testing.T) { assert := unit.Assert(t) response := Socks5Response{ @@ -79,7 +80,10 @@ func TestResponseToBytes(t *testing.T) { [16]byte{}, uint16(53), } - rawResponse := response.toBytes() + buffer := alloc.NewSmallBuffer().Clear() + defer buffer.Release() + + response.Write(buffer) expectedBytes := []byte{ socksVersion, ErrorSuccess, @@ -88,5 +92,5 @@ func TestResponseToBytes(t *testing.T) { 0x72, 0x72, 0x72, 0x72, byte(0x00), byte(0x035), } - assert.Bytes(rawResponse).Named("raw response").Equals(expectedBytes) + assert.Bytes(buffer.Value).Named("raw response").Equals(expectedBytes) } diff --git a/proxy/socks/protocol/udp.go b/proxy/socks/protocol/udp.go index 5d52afa58..be9fc84df 100644 --- a/proxy/socks/protocol/udp.go +++ b/proxy/socks/protocol/udp.go @@ -75,9 +75,7 @@ func ReadUDPRequest(packet []byte) (request Socks5UDPRequest, err error) { return } - request.Data = alloc.NewBuffer() - request.Data.Clear() - request.Data.Append(packet[dataBegin:]) + request.Data = alloc.NewBuffer().Clear().Append(packet[dataBegin:]) return } diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index 9e734e428..c16ede8ee 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -133,7 +133,10 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W if request.Command == protocol.CmdBind || request.Command == protocol.CmdUdpAssociate { response := protocol.NewSocks5Response() response.Error = protocol.ErrorCommandNotSupported - err = protocol.WriteResponse(writer, response) + + responseBuffer := alloc.NewSmallBuffer().Clear() + response.Write(responseBuffer) + _, err = writer.Write(responseBuffer.Value) if err != nil { log.Error("Socks failed to write response: %v", err) return err @@ -152,7 +155,9 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W response.IPv4[2] = 0 response.IPv4[3] = 0 - err = protocol.WriteResponse(writer, response) + responseBuffer := alloc.NewSmallBuffer().Clear() + response.Write(responseBuffer) + _, err = writer.Write(responseBuffer.Value) if err != nil { log.Error("Socks failed to write response: %v", err) return err @@ -187,7 +192,12 @@ func (server *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writ response.AddrType = protocol.AddrTypeDomain response.Domain = udpAddr.Domain() } - err := protocol.WriteResponse(writer, response) + + responseBuffer := alloc.NewSmallBuffer() + response.Write(responseBuffer) + _, err := writer.Write(responseBuffer.Value) + responseBuffer.Release() + if err != nil { log.Error("Socks failed to write response: %v", err) return err @@ -209,7 +219,11 @@ func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth result = protocol.Socks4RequestRejected } socks4Response := protocol.NewSocks4AuthenticationResponse(result, auth.Port, auth.IP[:]) - writer.Write(socks4Response.ToBytes(nil)) + + responseBuffer := alloc.NewSmallBuffer().Clear() + socks4Response.Write(responseBuffer) + writer.Write(responseBuffer.Value) + responseBuffer.Release() if result == protocol.Socks4RequestRejected { return errors.NewInvalidOperationError("Socks4 command " + strconv.Itoa(int(auth.Command))) diff --git a/proxy/vmess/vmessin.go b/proxy/vmess/vmessin.go index bda9dc88b..c4ae3e1a6 100644 --- a/proxy/vmess/vmessin.go +++ b/proxy/vmess/vmessin.go @@ -91,7 +91,7 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er } // Optimize for small response packet - buffer := alloc.NewBuffer() + buffer := alloc.NewLargeBuffer() buffer.Clear() buffer.Append(request.ResponseHeader)