diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go new file mode 100644 index 000000000..2bae31387 --- /dev/null +++ b/common/alloc/buffer.go @@ -0,0 +1,104 @@ +package alloc + +import ( + //"fmt" + "time" +) + +type Buffer struct { + head []byte + pool *bufferPool + Value []byte +} + +func (b *Buffer) Release() { + b.pool.free(b) +} + +func (b *Buffer) Clear() { + b.Value = b.Value[:0] +} + +func (b *Buffer) Append(data []byte) { + b.Value = append(b.Value, data...) +} + +func (b *Buffer) Slice(from, to int) { + b.Value = b.Value[from:to] +} + +func (b *Buffer) SliceFrom(from int) { + b.Value = b.Value[from:] +} + +func (b *Buffer) Len() int { + return len(b.Value) +} + +type bufferPool struct { + chain chan *Buffer + allocator func(*bufferPool) *Buffer + minElements int + maxElements int +} + +func newBufferPool(allocator func(*bufferPool) *Buffer, minElements, maxElements int) *bufferPool { + pool := &bufferPool{ + chain: make(chan *Buffer, maxElements*2), + allocator: allocateSmall, + minElements: minElements, + maxElements: maxElements, + } + for i := 0; i < minElements; i++ { + pool.chain <- allocator(pool) + } + go pool.cleanup(time.Tick(1 * time.Second)) + return pool +} + +func (p *bufferPool) allocate() *Buffer { + //fmt.Printf("Pool size: %d\n", len(p.chain)) + var b *Buffer + select { + case b = <-p.chain: + default: + b = p.allocator(p) + } + b.Value = b.head + return b +} + +func (p *bufferPool) free(buffer *Buffer) { + select { + case p.chain <- buffer: + default: + } + //fmt.Printf("Pool size: %d\n", len(p.chain)) +} + +func (p *bufferPool) cleanup(tick <-chan time.Time) { + for range tick { + pSize := len(p.chain) + for delta := pSize - p.minElements; delta > 0; delta-- { + p.chain <- p.allocator(p) + } + for delta := p.maxElements - pSize; delta > 0; delta-- { + <-p.chain + } + } +} + +func allocateSmall(pool *bufferPool) *Buffer { + b := &Buffer{ + head: make([]byte, 8*1024), + } + b.Value = b.head + b.pool = pool + return b +} + +var smallPool = newBufferPool(allocateSmall, 256, 1024) + +func NewBuffer() *Buffer { + return smallPool.allocate() +} diff --git a/common/net/packet.go b/common/net/packet.go index 43f64f890..47d5dd677 100644 --- a/common/net/packet.go +++ b/common/net/packet.go @@ -1,12 +1,16 @@ package net +import ( + "github.com/v2ray/v2ray-core/common/alloc" +) + type Packet interface { Destination() Destination - Chunk() []byte // First chunk of this commnunication + Chunk() *alloc.Buffer // First chunk of this commnunication MoreChunks() bool } -func NewPacket(dest Destination, firstChunk []byte, moreChunks bool) Packet { +func NewPacket(dest Destination, firstChunk *alloc.Buffer, moreChunks bool) Packet { return &packetImpl{ dest: dest, data: firstChunk, @@ -16,7 +20,7 @@ func NewPacket(dest Destination, firstChunk []byte, moreChunks bool) Packet { type packetImpl struct { dest Destination - data []byte + data *alloc.Buffer moreData bool } @@ -24,7 +28,7 @@ func (packet *packetImpl) Destination() Destination { return packet.dest } -func (packet *packetImpl) Chunk() []byte { +func (packet *packetImpl) Chunk() *alloc.Buffer { return packet.data } diff --git a/common/net/transport.go b/common/net/transport.go index fa02af7eb..4e7185656 100644 --- a/common/net/transport.go +++ b/common/net/transport.go @@ -2,63 +2,41 @@ package net import ( "io" + + "github.com/v2ray/v2ray-core/common/alloc" ) -const ( - minBufferSizeKilo = 2 - maxBufferSizeKilo = 128 -) - -func ReadFrom(reader io.Reader, sizeInKilo int) ([]byte, error) { - buffer := make([]byte, sizeInKilo<<10) - nBytes, err := reader.Read(buffer) - if nBytes == 0 { - return nil, err +func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) { + if buffer == nil { + buffer = alloc.NewBuffer() } - return buffer[:nBytes], err -} - -func roundUp(size int) int { - if size <= minBufferSizeKilo { - return minBufferSizeKilo - } - if size >= maxBufferSizeKilo { - return maxBufferSizeKilo - } - size-- - size |= size >> 1 - size |= size >> 2 - size |= size >> 4 - return size + 1 + nBytes, err := reader.Read(buffer.Value) + buffer.Slice(0, nBytes) + return buffer, err } // ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF. -func ReaderToChan(stream chan<- []byte, reader io.Reader) error { - bufferSizeKilo := 2 +func ReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error { for { - data, err := ReadFrom(reader, bufferSizeKilo) - if len(data) > 0 { - stream <- data + buffer, err := ReadFrom(reader, nil) + if buffer.Len() > 0 { + stream <- buffer + } else { + buffer.Release() + buffer = nil } if err != nil { return err } - if bufferSizeKilo == maxBufferSizeKilo { - continue - } - dataLenKilo := len(data) >> 10 - if dataLenKilo == bufferSizeKilo { - bufferSizeKilo <<= 1 - } else { - bufferSizeKilo = roundUp(dataLenKilo) - } } } // ChanToWriter dumps all content from a given chan to a writer until the chan is closed. -func ChanToWriter(writer io.Writer, stream <-chan []byte) error { +func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error { for buffer := range stream { - _, err := writer.Write(buffer) + _, err := writer.Write(buffer.Value) + buffer.Release() + buffer = nil if err != nil { return err } diff --git a/common/net/transport_test.go b/common/net/transport_test.go index df8ec0dab..4c5492338 100644 --- a/common/net/transport_test.go +++ b/common/net/transport_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "testing" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/testing/unit" ) @@ -22,7 +23,7 @@ func TestReaderAndWrite(t *testing.T) { readerBuffer := bytes.NewReader(buffer) writerBuffer := bytes.NewBuffer(make([]byte, 0, size)) - transportChan := make(chan []byte, 1024) + transportChan := make(chan *alloc.Buffer, 1024) err = ReaderToChan(transportChan, readerBuffer) assert.Error(err).Equals(io.EOF) @@ -44,7 +45,7 @@ func (reader *StaticReader) Read(b []byte) (size int, err error) { if size > reader.total-reader.current { size = reader.total - reader.current } - for i := 0; i < len(b); i++ { + for i := 0; i < size; i++ { b[i] = byte(i) } //rand.Read(b[:size]) @@ -113,8 +114,8 @@ func BenchmarkTransport10M(b *testing.B) { func runBenchmarkTransport(size int) { - transportChanA := make(chan []byte, 16) - transportChanB := make(chan []byte, 16) + transportChanA := make(chan *alloc.Buffer, 16) + transportChanB := make(chan *alloc.Buffer, 16) readerA := &StaticReader{size, 0} readerB := &StaticReader{size, 0} diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 7fb55b707..2e6f6234e 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" ) @@ -33,7 +34,9 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.Outb writeMutex.Lock() if chunk := firstPacket.Chunk(); chunk != nil { - conn.Write(chunk) + conn.Write(chunk.Value) + chunk.Release() + chunk = nil } if !firstPacket.MoreChunks() { @@ -56,23 +59,22 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.Outb return nil } -func dumpInput(conn net.Conn, input <-chan []byte, finish *sync.Mutex) { +func dumpInput(conn net.Conn, input <-chan *alloc.Buffer, finish *sync.Mutex) { v2net.ChanToWriter(conn, input) finish.Unlock() } -func dumpOutput(conn net.Conn, output chan<- []byte, finish *sync.Mutex, udp bool) { +func dumpOutput(conn net.Conn, output chan<- *alloc.Buffer, finish *sync.Mutex, udp bool) { defer finish.Unlock() defer close(output) - bufferSize := 4 /* KB */ - if udp { - bufferSize = 2 - } - response, err := v2net.ReadFrom(conn, bufferSize) - log.Info("Freedom receives %d bytes from %s", len(response), conn.RemoteAddr().String()) - if len(response) > 0 { + response, err := v2net.ReadFrom(conn, nil) + log.Info("Freedom receives %d bytes from %s", response.Len(), conn.RemoteAddr().String()) + if response.Len() > 0 { output <- response + } else { + response.Release() + response = nil } if err != nil { return diff --git a/proxy/freedom/freedom_test.go b/proxy/freedom/freedom_test.go index 91e356e36..87f44c1f7 100644 --- a/proxy/freedom/freedom_test.go +++ b/proxy/freedom/freedom_test.go @@ -9,6 +9,7 @@ import ( "golang.org/x/net/proxy" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" _ "github.com/v2ray/v2ray-core/proxy/socks" "github.com/v2ray/v2ray-core/proxy/socks/config/json" @@ -62,8 +63,11 @@ func TestUDPSend(t *testing.T) { err = point.Start() assert.Error(err).IsNil() + data2SendBuffer := alloc.NewBuffer() + data2SendBuffer.Clear() + data2SendBuffer.Append([]byte(data2Send)) dest := v2net.NewUDPDestination(udpServerAddr) - ich.Communicate(v2net.NewPacket(dest, []byte(data2Send), false)) + ich.Communicate(v2net.NewPacket(dest, data2SendBuffer, false)) assert.Bytes(ich.DataReturned.Bytes()).Equals([]byte("Processed: Data to be sent to remote")) } diff --git a/proxy/socks/protocol/udp.go b/proxy/socks/protocol/udp.go index 88484e32f..5d52afa58 100644 --- a/proxy/socks/protocol/udp.go +++ b/proxy/socks/protocol/udp.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" ) @@ -15,7 +16,7 @@ var ( type Socks5UDPRequest struct { Fragment byte Address v2net.Address - Data []byte + Data *alloc.Buffer } func (request *Socks5UDPRequest) Destination() v2net.Destination { @@ -40,7 +41,7 @@ func (request *Socks5UDPRequest) Bytes(buffer []byte) []byte { buffer = append(buffer, []byte(request.Address.Domain())...) } buffer = append(buffer, request.Address.PortBytes()...) - buffer = append(buffer, request.Data...) + buffer = append(buffer, request.Data.Value...) return buffer } @@ -74,8 +75,9 @@ func ReadUDPRequest(packet []byte) (request Socks5UDPRequest, err error) { return } - request.Data = make([]byte, len(packet)-dataBegin) - copy(request.Data, packet[dataBegin:]) + request.Data = alloc.NewBuffer() + request.Data.Clear() + request.Data.Append(packet[dataBegin:]) return } diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index c97f0c6de..9e734e428 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -8,6 +8,7 @@ import ( "time" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/errors" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -158,7 +159,7 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W } dest := request.Destination() - data, err := v2net.ReadFrom(reader, 4) + data, err := v2net.ReadFrom(reader, nil) if err != nil { return err } @@ -192,8 +193,8 @@ func (server *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writ return err } - reader.SetTimeOut(300) /* 5 minutes */ - v2net.ReadFrom(reader, 1) // Just in case of anything left in the socket + reader.SetTimeOut(300) /* 5 minutes */ + v2net.ReadFrom(reader, nil) // Just in case of anything left in the socket // The TCP connection closes after this method returns. We need to wait until // the client closes it. // TODO: get notified from UDP part @@ -215,7 +216,7 @@ func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth } dest := v2net.NewTCPDestination(v2net.IPAddress(auth.IP[:], auth.Port)) - data, err := v2net.ReadFrom(reader, 4) + data, err := v2net.ReadFrom(reader, nil) if err != nil { return err } @@ -239,13 +240,13 @@ func (server *SocksServer) transport(reader io.Reader, writer io.Writer, firstPa outputFinish.Lock() } -func dumpInput(reader io.Reader, input chan<- []byte, finish *sync.Mutex) { +func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) { v2net.ReaderToChan(input, reader) finish.Unlock() close(input) } -func dumpOutput(writer io.Writer, output <-chan []byte, finish *sync.Mutex) { +func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) { v2net.ChanToWriter(writer, output) finish.Unlock() } diff --git a/proxy/socks/udp.go b/proxy/socks/udp.go index 408dea6c3..054e9b3b9 100644 --- a/proxy/socks/udp.go +++ b/proxy/socks/udp.go @@ -3,6 +3,7 @@ package socks import ( "net" + "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy/socks/protocol" @@ -38,14 +39,16 @@ func (server *SocksServer) getUDPAddr() v2net.Address { func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error { for { - buffer := make([]byte, bufferSize) - nBytes, addr, err := conn.ReadFromUDP(buffer) + buffer := alloc.NewBuffer() + defer buffer.Release() + nBytes, addr, err := conn.ReadFromUDP(buffer.Value) if err != nil { log.Error("Socks failed to read UDP packets: %v", err) continue } + buffer.Slice(0, nBytes) log.Info("Client UDP connection from %v", addr) - request, err := protocol.ReadUDPRequest(buffer[:nBytes]) + request, err := protocol.ReadUDPRequest(buffer.Value) if err != nil { log.Error("Socks failed to parse UDP request: %v", err) continue @@ -57,7 +60,7 @@ func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error { } udpPacket := v2net.NewPacket(request.Destination(), request.Data, false) - log.Info("Send packet to %s with %d bytes", udpPacket.Destination().String(), len(request.Data)) + log.Info("Send packet to %s with %d bytes", udpPacket.Destination().String(), request.Data.Len()) go server.handlePacket(conn, udpPacket, addr, request.Address) } } @@ -72,9 +75,11 @@ func (server *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet, Address: targetAddr, Data: data, } - log.Info("Writing back UDP response with %d bytes from %s to %s", len(data), targetAddr.String(), clientAddr.String()) + log.Info("Writing back UDP response with %d bytes from %s to %s", data.Len(), targetAddr.String(), clientAddr.String()) udpMessage := response.Bytes(nil) nBytes, err := conn.WriteToUDP(udpMessage, clientAddr) + response.Data.Release() + response.Data = nil if err != nil { log.Error("Socks failed to write UDP message (%d bytes) to %s: %v", nBytes, clientAddr.String(), err) } diff --git a/proxy/vmess/vmess_test.go b/proxy/vmess/vmess_test.go index 38e3be9ce..6d9d0e45a 100644 --- a/proxy/vmess/vmess_test.go +++ b/proxy/vmess/vmess_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/testing/mocks" "github.com/v2ray/v2ray-core/testing/unit" @@ -163,8 +164,11 @@ func TestVMessInAndOutUDP(t *testing.T) { err = pointB.Start() assert.Error(err).IsNil() + data2SendBuffer := alloc.NewBuffer() + data2SendBuffer.Clear() + data2SendBuffer.Append([]byte(data2Send)) dest := v2net.NewUDPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}, 80)) - ich.Communicate(v2net.NewPacket(dest, []byte(data2Send), false)) + ich.Communicate(v2net.NewPacket(dest, data2SendBuffer, false)) assert.Bytes([]byte(data2Send)).Equals(och.Data2Send.Bytes()) assert.Bytes(ich.DataReturned.Bytes()).Equals(och.Data2Return) } diff --git a/proxy/vmess/vmessin.go b/proxy/vmess/vmessin.go index 9944cdef7..291669bda 100644 --- a/proxy/vmess/vmessin.go +++ b/proxy/vmess/vmessin.go @@ -5,9 +5,9 @@ import ( "io" "net" "sync" - "time" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -15,14 +15,6 @@ import ( "github.com/v2ray/v2ray-core/proxy/vmess/protocol/user" ) -const ( - requestReadTimeOut = 4 * time.Second -) - -var ( - zeroTime time.Time -) - type VMessInboundHandler struct { vPoint *core.Point clients user.UserSet @@ -103,7 +95,7 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er buffer = append(buffer, request.ResponseHeader...) if data, open := <-output; open { - buffer = append(buffer, data...) + buffer = append(buffer, data.Value...) data = nil responseWriter.Write(buffer) buffer = nil @@ -117,7 +109,7 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er return nil } -func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- []byte, finish *sync.Mutex) { +func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) { defer close(input) defer finish.Unlock() @@ -130,7 +122,7 @@ func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- v2net.ReaderToChan(input, requestReader) } -func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan []byte, finish *sync.Mutex) { +func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) { v2net.ChanToWriter(writer, output) finish.Unlock() } diff --git a/proxy/vmess/vmessin_udp.go b/proxy/vmess/vmessin_udp.go index 11b455408..3c9729d35 100644 --- a/proxy/vmess/vmessin_udp.go +++ b/proxy/vmess/vmessin_udp.go @@ -5,6 +5,7 @@ import ( "crypto/md5" "net" + "github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -55,14 +56,15 @@ func (handler *VMessInboundHandler) AcceptPackets(conn *net.UDPConn) { continue } - data := make([]byte, bufferSize) - nBytes, err = cryptReader.Read(data) + data := alloc.NewBuffer() + nBytes, err = cryptReader.Read(data.Value) if err != nil { log.Warning("VMessIn: Unable to decrypt data: %v", err) continue } + data.Slice(0, nBytes) - packet := v2net.NewPacket(request.Destination(), data[:nBytes], false) + packet := v2net.NewPacket(request.Destination(), data, false) go handler.handlePacket(conn, request, packet, addr) } } @@ -87,7 +89,9 @@ func (handler *VMessInboundHandler) handlePacket(conn *net.UDPConn, request *pro if data, ok := <-ray.InboundOutput(); ok { hasData = true - responseWriter.Write(data) + responseWriter.Write(data.Value) + data.Release() + data = nil } if hasData { diff --git a/proxy/vmess/vmessout.go b/proxy/vmess/vmessout.go index 08e761925..1ef4e0ebc 100644 --- a/proxy/vmess/vmessout.go +++ b/proxy/vmess/vmessout.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -121,7 +122,7 @@ func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ra return nil } -func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2net.Packet, input <-chan []byte, finish *sync.Mutex) { +func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2net.Packet, input <-chan *alloc.Buffer, finish *sync.Mutex) { defer finish.Unlock() encryptRequestWriter, err := v2io.NewAesEncryptWriter(request.RequestKey[:], request.RequestIV[:], conn) if err != nil { @@ -129,8 +130,9 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2 return } - buffer := make([]byte, 0, 2*1024) - buffer, err = request.ToBytes(user.NewTimeHash(user.HMACHash{}), user.GenerateRandomInt64InRange, buffer) + buffer := alloc.NewBuffer() + buffer.Clear() + requestBytes, err := request.ToBytes(user.NewTimeHash(user.HMACHash{}), user.GenerateRandomInt64InRange, buffer.Value) if err != nil { log.Error("VMessOut: Failed to serialize VMess request: %v", err) return @@ -145,10 +147,14 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2 } if firstChunk != nil { - encryptRequestWriter.Crypt(firstChunk) - buffer = append(buffer, firstChunk...) + encryptRequestWriter.Crypt(firstChunk.Value) + requestBytes = append(requestBytes, firstChunk.Value...) + firstChunk.Release() + firstChunk = nil - _, err = conn.Write(buffer) + _, err = conn.Write(requestBytes) + buffer.Release() + buffer = nil if err != nil { log.Error("VMessOut: Failed to write VMess request: %v", err) return @@ -161,7 +167,7 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2 return } -func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<- []byte, finish *sync.Mutex, isUDP bool) { +func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<- *alloc.Buffer, finish *sync.Mutex, isUDP bool) { defer finish.Unlock() defer close(output) responseKey := md5.Sum(request.RequestKey[:]) @@ -173,18 +179,19 @@ func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<- return } - buffer, err := v2net.ReadFrom(decryptResponseReader, 4) + buffer, err := v2net.ReadFrom(decryptResponseReader, nil) if err != nil { - log.Error("VMessOut: Failed to read VMess response (%d bytes): %v", len(buffer), err) + log.Error("VMessOut: Failed to read VMess response (%d bytes): %v", buffer.Len(), err) return } - if len(buffer) < 4 || !bytes.Equal(buffer[:4], request.ResponseHeader[:]) { + if buffer.Len() < 4 || !bytes.Equal(buffer.Value[:4], request.ResponseHeader[:]) { log.Warning("VMessOut: unexepcted response header. The connection is probably hijacked.") return } - log.Info("VMessOut received %d bytes from %s", len(buffer)-4, conn.RemoteAddr().String()) + log.Info("VMessOut received %d bytes from %s", buffer.Len()-4, conn.RemoteAddr().String()) - output <- buffer[4:] + buffer.SliceFrom(4) + output <- buffer if !isUDP { v2net.ReaderToChan(output, decryptResponseReader) diff --git a/ray.go b/ray.go index 6598af773..663b2400f 100644 --- a/ray.go +++ b/ray.go @@ -1,44 +1,48 @@ package core +import ( + "github.com/v2ray/v2ray-core/common/alloc" +) + const ( bufferSize = 16 ) // Ray is an internal tranport channel bewteen inbound and outbound connection. type Ray struct { - Input chan []byte - Output chan []byte + Input chan *alloc.Buffer + Output chan *alloc.Buffer } func NewRay() *Ray { return &Ray{ - Input: make(chan []byte, bufferSize), - Output: make(chan []byte, bufferSize), + Input: make(chan *alloc.Buffer, bufferSize), + Output: make(chan *alloc.Buffer, bufferSize), } } type OutboundRay interface { - OutboundInput() <-chan []byte - OutboundOutput() chan<- []byte + OutboundInput() <-chan *alloc.Buffer + OutboundOutput() chan<- *alloc.Buffer } type InboundRay interface { - InboundInput() chan<- []byte - InboundOutput() <-chan []byte + InboundInput() chan<- *alloc.Buffer + InboundOutput() <-chan *alloc.Buffer } -func (ray *Ray) OutboundInput() <-chan []byte { +func (ray *Ray) OutboundInput() <-chan *alloc.Buffer { return ray.Input } -func (ray *Ray) OutboundOutput() chan<- []byte { +func (ray *Ray) OutboundOutput() chan<- *alloc.Buffer { return ray.Output } -func (ray *Ray) InboundInput() chan<- []byte { +func (ray *Ray) InboundInput() chan<- *alloc.Buffer { return ray.Input } -func (ray *Ray) InboundOutput() <-chan []byte { +func (ray *Ray) InboundOutput() <-chan *alloc.Buffer { return ray.Output } diff --git a/testing/mocks/inboundhandler.go b/testing/mocks/inboundhandler.go index 0fb73e48f..a8e330e86 100644 --- a/testing/mocks/inboundhandler.go +++ b/testing/mocks/inboundhandler.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" ) @@ -25,7 +26,10 @@ func (handler *InboundConnectionHandler) Communicate(packet v2net.Packet) error input := ray.InboundInput() output := ray.InboundOutput() - input <- handler.Data2Send + buffer := alloc.NewBuffer() + buffer.Clear() + buffer.Append(handler.Data2Send) + input <- buffer close(input) v2net.ChanToWriter(handler.DataReturned, output) diff --git a/testing/mocks/outboundhandler.go b/testing/mocks/outboundhandler.go index 0a3ef47d8..75e9489bb 100644 --- a/testing/mocks/outboundhandler.go +++ b/testing/mocks/outboundhandler.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/v2ray/v2ray-core" + "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" ) @@ -19,7 +20,7 @@ func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core handler.Destination = packet.Destination() if packet.Chunk() != nil { - handler.Data2Send.Write(packet.Chunk()) + handler.Data2Send.Write(packet.Chunk().Value) } go func() { @@ -28,11 +29,13 @@ func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core if !open { break } - handler.Data2Send.Write(data) + handler.Data2Send.Write(data.Value) + data.Release() } - dataCopy := make([]byte, len(handler.Data2Return)) - copy(dataCopy, handler.Data2Return) - output <- dataCopy + response := alloc.NewBuffer() + response.Clear() + response.Append(handler.Data2Return) + output <- response close(output) }()