1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-17 06:46:33 -05:00

Maintain an internal buffer pool to accelerate allocation

This commit is contained in:
V2Ray 2015-10-08 14:46:18 +02:00
parent 71df5103cd
commit 9ee73c4f6b
16 changed files with 241 additions and 122 deletions

104
common/alloc/buffer.go Normal file
View File

@ -0,0 +1,104 @@
package alloc
import (
//"fmt"
"time"
)
type Buffer struct {
head []byte
pool *bufferPool
Value []byte
}
func (b *Buffer) Release() {
b.pool.free(b)
}
func (b *Buffer) Clear() {
b.Value = b.Value[:0]
}
func (b *Buffer) Append(data []byte) {
b.Value = append(b.Value, data...)
}
func (b *Buffer) Slice(from, to int) {
b.Value = b.Value[from:to]
}
func (b *Buffer) SliceFrom(from int) {
b.Value = b.Value[from:]
}
func (b *Buffer) Len() int {
return len(b.Value)
}
type bufferPool struct {
chain chan *Buffer
allocator func(*bufferPool) *Buffer
minElements int
maxElements int
}
func newBufferPool(allocator func(*bufferPool) *Buffer, minElements, maxElements int) *bufferPool {
pool := &bufferPool{
chain: make(chan *Buffer, maxElements*2),
allocator: allocateSmall,
minElements: minElements,
maxElements: maxElements,
}
for i := 0; i < minElements; i++ {
pool.chain <- allocator(pool)
}
go pool.cleanup(time.Tick(1 * time.Second))
return pool
}
func (p *bufferPool) allocate() *Buffer {
//fmt.Printf("Pool size: %d\n", len(p.chain))
var b *Buffer
select {
case b = <-p.chain:
default:
b = p.allocator(p)
}
b.Value = b.head
return b
}
func (p *bufferPool) free(buffer *Buffer) {
select {
case p.chain <- buffer:
default:
}
//fmt.Printf("Pool size: %d\n", len(p.chain))
}
func (p *bufferPool) cleanup(tick <-chan time.Time) {
for range tick {
pSize := len(p.chain)
for delta := pSize - p.minElements; delta > 0; delta-- {
p.chain <- p.allocator(p)
}
for delta := p.maxElements - pSize; delta > 0; delta-- {
<-p.chain
}
}
}
func allocateSmall(pool *bufferPool) *Buffer {
b := &Buffer{
head: make([]byte, 8*1024),
}
b.Value = b.head
b.pool = pool
return b
}
var smallPool = newBufferPool(allocateSmall, 256, 1024)
func NewBuffer() *Buffer {
return smallPool.allocate()
}

View File

@ -1,12 +1,16 @@
package net package net
import (
"github.com/v2ray/v2ray-core/common/alloc"
)
type Packet interface { type Packet interface {
Destination() Destination Destination() Destination
Chunk() []byte // First chunk of this commnunication Chunk() *alloc.Buffer // First chunk of this commnunication
MoreChunks() bool MoreChunks() bool
} }
func NewPacket(dest Destination, firstChunk []byte, moreChunks bool) Packet { func NewPacket(dest Destination, firstChunk *alloc.Buffer, moreChunks bool) Packet {
return &packetImpl{ return &packetImpl{
dest: dest, dest: dest,
data: firstChunk, data: firstChunk,
@ -16,7 +20,7 @@ func NewPacket(dest Destination, firstChunk []byte, moreChunks bool) Packet {
type packetImpl struct { type packetImpl struct {
dest Destination dest Destination
data []byte data *alloc.Buffer
moreData bool moreData bool
} }
@ -24,7 +28,7 @@ func (packet *packetImpl) Destination() Destination {
return packet.dest return packet.dest
} }
func (packet *packetImpl) Chunk() []byte { func (packet *packetImpl) Chunk() *alloc.Buffer {
return packet.data return packet.data
} }

View File

@ -2,63 +2,41 @@ package net
import ( import (
"io" "io"
"github.com/v2ray/v2ray-core/common/alloc"
) )
const ( func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
minBufferSizeKilo = 2 if buffer == nil {
maxBufferSizeKilo = 128 buffer = alloc.NewBuffer()
)
func ReadFrom(reader io.Reader, sizeInKilo int) ([]byte, error) {
buffer := make([]byte, sizeInKilo<<10)
nBytes, err := reader.Read(buffer)
if nBytes == 0 {
return nil, err
} }
return buffer[:nBytes], err nBytes, err := reader.Read(buffer.Value)
} buffer.Slice(0, nBytes)
return buffer, err
func roundUp(size int) int {
if size <= minBufferSizeKilo {
return minBufferSizeKilo
}
if size >= maxBufferSizeKilo {
return maxBufferSizeKilo
}
size--
size |= size >> 1
size |= size >> 2
size |= size >> 4
return size + 1
} }
// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF. // ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
func ReaderToChan(stream chan<- []byte, reader io.Reader) error { func ReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error {
bufferSizeKilo := 2
for { for {
data, err := ReadFrom(reader, bufferSizeKilo) buffer, err := ReadFrom(reader, nil)
if len(data) > 0 { if buffer.Len() > 0 {
stream <- data stream <- buffer
} else {
buffer.Release()
buffer = nil
} }
if err != nil { if err != nil {
return err return err
} }
if bufferSizeKilo == maxBufferSizeKilo {
continue
}
dataLenKilo := len(data) >> 10
if dataLenKilo == bufferSizeKilo {
bufferSizeKilo <<= 1
} else {
bufferSizeKilo = roundUp(dataLenKilo)
}
} }
} }
// ChanToWriter dumps all content from a given chan to a writer until the chan is closed. // ChanToWriter dumps all content from a given chan to a writer until the chan is closed.
func ChanToWriter(writer io.Writer, stream <-chan []byte) error { func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error {
for buffer := range stream { for buffer := range stream {
_, err := writer.Write(buffer) _, err := writer.Write(buffer.Value)
buffer.Release()
buffer = nil
if err != nil { if err != nil {
return err return err
} }

View File

@ -7,6 +7,7 @@ import (
"io/ioutil" "io/ioutil"
"testing" "testing"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/testing/unit" "github.com/v2ray/v2ray-core/testing/unit"
) )
@ -22,7 +23,7 @@ func TestReaderAndWrite(t *testing.T) {
readerBuffer := bytes.NewReader(buffer) readerBuffer := bytes.NewReader(buffer)
writerBuffer := bytes.NewBuffer(make([]byte, 0, size)) writerBuffer := bytes.NewBuffer(make([]byte, 0, size))
transportChan := make(chan []byte, 1024) transportChan := make(chan *alloc.Buffer, 1024)
err = ReaderToChan(transportChan, readerBuffer) err = ReaderToChan(transportChan, readerBuffer)
assert.Error(err).Equals(io.EOF) assert.Error(err).Equals(io.EOF)
@ -44,7 +45,7 @@ func (reader *StaticReader) Read(b []byte) (size int, err error) {
if size > reader.total-reader.current { if size > reader.total-reader.current {
size = reader.total - reader.current size = reader.total - reader.current
} }
for i := 0; i < len(b); i++ { for i := 0; i < size; i++ {
b[i] = byte(i) b[i] = byte(i)
} }
//rand.Read(b[:size]) //rand.Read(b[:size])
@ -113,8 +114,8 @@ func BenchmarkTransport10M(b *testing.B) {
func runBenchmarkTransport(size int) { func runBenchmarkTransport(size int) {
transportChanA := make(chan []byte, 16) transportChanA := make(chan *alloc.Buffer, 16)
transportChanB := make(chan []byte, 16) transportChanB := make(chan *alloc.Buffer, 16)
readerA := &StaticReader{size, 0} readerA := &StaticReader{size, 0}
readerB := &StaticReader{size, 0} readerB := &StaticReader{size, 0}

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
) )
@ -33,7 +34,9 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.Outb
writeMutex.Lock() writeMutex.Lock()
if chunk := firstPacket.Chunk(); chunk != nil { if chunk := firstPacket.Chunk(); chunk != nil {
conn.Write(chunk) conn.Write(chunk.Value)
chunk.Release()
chunk = nil
} }
if !firstPacket.MoreChunks() { if !firstPacket.MoreChunks() {
@ -56,23 +59,22 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.Outb
return nil return nil
} }
func dumpInput(conn net.Conn, input <-chan []byte, finish *sync.Mutex) { func dumpInput(conn net.Conn, input <-chan *alloc.Buffer, finish *sync.Mutex) {
v2net.ChanToWriter(conn, input) v2net.ChanToWriter(conn, input)
finish.Unlock() finish.Unlock()
} }
func dumpOutput(conn net.Conn, output chan<- []byte, finish *sync.Mutex, udp bool) { func dumpOutput(conn net.Conn, output chan<- *alloc.Buffer, finish *sync.Mutex, udp bool) {
defer finish.Unlock() defer finish.Unlock()
defer close(output) defer close(output)
bufferSize := 4 /* KB */ response, err := v2net.ReadFrom(conn, nil)
if udp { log.Info("Freedom receives %d bytes from %s", response.Len(), conn.RemoteAddr().String())
bufferSize = 2 if response.Len() > 0 {
}
response, err := v2net.ReadFrom(conn, bufferSize)
log.Info("Freedom receives %d bytes from %s", len(response), conn.RemoteAddr().String())
if len(response) > 0 {
output <- response output <- response
} else {
response.Release()
response = nil
} }
if err != nil { if err != nil {
return return

View File

@ -9,6 +9,7 @@ import (
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
_ "github.com/v2ray/v2ray-core/proxy/socks" _ "github.com/v2ray/v2ray-core/proxy/socks"
"github.com/v2ray/v2ray-core/proxy/socks/config/json" "github.com/v2ray/v2ray-core/proxy/socks/config/json"
@ -62,8 +63,11 @@ func TestUDPSend(t *testing.T) {
err = point.Start() err = point.Start()
assert.Error(err).IsNil() assert.Error(err).IsNil()
data2SendBuffer := alloc.NewBuffer()
data2SendBuffer.Clear()
data2SendBuffer.Append([]byte(data2Send))
dest := v2net.NewUDPDestination(udpServerAddr) dest := v2net.NewUDPDestination(udpServerAddr)
ich.Communicate(v2net.NewPacket(dest, []byte(data2Send), false)) ich.Communicate(v2net.NewPacket(dest, data2SendBuffer, false))
assert.Bytes(ich.DataReturned.Bytes()).Equals([]byte("Processed: Data to be sent to remote")) assert.Bytes(ich.DataReturned.Bytes()).Equals([]byte("Processed: Data to be sent to remote"))
} }

View File

@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
) )
@ -15,7 +16,7 @@ var (
type Socks5UDPRequest struct { type Socks5UDPRequest struct {
Fragment byte Fragment byte
Address v2net.Address Address v2net.Address
Data []byte Data *alloc.Buffer
} }
func (request *Socks5UDPRequest) Destination() v2net.Destination { func (request *Socks5UDPRequest) Destination() v2net.Destination {
@ -40,7 +41,7 @@ func (request *Socks5UDPRequest) Bytes(buffer []byte) []byte {
buffer = append(buffer, []byte(request.Address.Domain())...) buffer = append(buffer, []byte(request.Address.Domain())...)
} }
buffer = append(buffer, request.Address.PortBytes()...) buffer = append(buffer, request.Address.PortBytes()...)
buffer = append(buffer, request.Data...) buffer = append(buffer, request.Data.Value...)
return buffer return buffer
} }
@ -74,8 +75,9 @@ func ReadUDPRequest(packet []byte) (request Socks5UDPRequest, err error) {
return return
} }
request.Data = make([]byte, len(packet)-dataBegin) request.Data = alloc.NewBuffer()
copy(request.Data, packet[dataBegin:]) request.Data.Clear()
request.Data.Append(packet[dataBegin:])
return return
} }

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/errors" "github.com/v2ray/v2ray-core/common/errors"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
@ -158,7 +159,7 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W
} }
dest := request.Destination() dest := request.Destination()
data, err := v2net.ReadFrom(reader, 4) data, err := v2net.ReadFrom(reader, nil)
if err != nil { if err != nil {
return err return err
} }
@ -192,8 +193,8 @@ func (server *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writ
return err return err
} }
reader.SetTimeOut(300) /* 5 minutes */ reader.SetTimeOut(300) /* 5 minutes */
v2net.ReadFrom(reader, 1) // Just in case of anything left in the socket v2net.ReadFrom(reader, nil) // Just in case of anything left in the socket
// The TCP connection closes after this method returns. We need to wait until // The TCP connection closes after this method returns. We need to wait until
// the client closes it. // the client closes it.
// TODO: get notified from UDP part // TODO: get notified from UDP part
@ -215,7 +216,7 @@ func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth
} }
dest := v2net.NewTCPDestination(v2net.IPAddress(auth.IP[:], auth.Port)) dest := v2net.NewTCPDestination(v2net.IPAddress(auth.IP[:], auth.Port))
data, err := v2net.ReadFrom(reader, 4) data, err := v2net.ReadFrom(reader, nil)
if err != nil { if err != nil {
return err return err
} }
@ -239,13 +240,13 @@ func (server *SocksServer) transport(reader io.Reader, writer io.Writer, firstPa
outputFinish.Lock() outputFinish.Lock()
} }
func dumpInput(reader io.Reader, input chan<- []byte, finish *sync.Mutex) { func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {
v2net.ReaderToChan(input, reader) v2net.ReaderToChan(input, reader)
finish.Unlock() finish.Unlock()
close(input) close(input)
} }
func dumpOutput(writer io.Writer, output <-chan []byte, finish *sync.Mutex) { func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) {
v2net.ChanToWriter(writer, output) v2net.ChanToWriter(writer, output)
finish.Unlock() finish.Unlock()
} }

View File

@ -3,6 +3,7 @@ package socks
import ( import (
"net" "net"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy/socks/protocol" "github.com/v2ray/v2ray-core/proxy/socks/protocol"
@ -38,14 +39,16 @@ func (server *SocksServer) getUDPAddr() v2net.Address {
func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error { func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error {
for { for {
buffer := make([]byte, bufferSize) buffer := alloc.NewBuffer()
nBytes, addr, err := conn.ReadFromUDP(buffer) defer buffer.Release()
nBytes, addr, err := conn.ReadFromUDP(buffer.Value)
if err != nil { if err != nil {
log.Error("Socks failed to read UDP packets: %v", err) log.Error("Socks failed to read UDP packets: %v", err)
continue continue
} }
buffer.Slice(0, nBytes)
log.Info("Client UDP connection from %v", addr) log.Info("Client UDP connection from %v", addr)
request, err := protocol.ReadUDPRequest(buffer[:nBytes]) request, err := protocol.ReadUDPRequest(buffer.Value)
if err != nil { if err != nil {
log.Error("Socks failed to parse UDP request: %v", err) log.Error("Socks failed to parse UDP request: %v", err)
continue continue
@ -57,7 +60,7 @@ func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error {
} }
udpPacket := v2net.NewPacket(request.Destination(), request.Data, false) udpPacket := v2net.NewPacket(request.Destination(), request.Data, false)
log.Info("Send packet to %s with %d bytes", udpPacket.Destination().String(), len(request.Data)) log.Info("Send packet to %s with %d bytes", udpPacket.Destination().String(), request.Data.Len())
go server.handlePacket(conn, udpPacket, addr, request.Address) go server.handlePacket(conn, udpPacket, addr, request.Address)
} }
} }
@ -72,9 +75,11 @@ func (server *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet,
Address: targetAddr, Address: targetAddr,
Data: data, Data: data,
} }
log.Info("Writing back UDP response with %d bytes from %s to %s", len(data), targetAddr.String(), clientAddr.String()) log.Info("Writing back UDP response with %d bytes from %s to %s", data.Len(), targetAddr.String(), clientAddr.String())
udpMessage := response.Bytes(nil) udpMessage := response.Bytes(nil)
nBytes, err := conn.WriteToUDP(udpMessage, clientAddr) nBytes, err := conn.WriteToUDP(udpMessage, clientAddr)
response.Data.Release()
response.Data = nil
if err != nil { if err != nil {
log.Error("Socks failed to write UDP message (%d bytes) to %s: %v", nBytes, clientAddr.String(), err) log.Error("Socks failed to write UDP message (%d bytes) to %s: %v", nBytes, clientAddr.String(), err)
} }

View File

@ -5,6 +5,7 @@ import (
"testing" "testing"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/testing/mocks" "github.com/v2ray/v2ray-core/testing/mocks"
"github.com/v2ray/v2ray-core/testing/unit" "github.com/v2ray/v2ray-core/testing/unit"
@ -163,8 +164,11 @@ func TestVMessInAndOutUDP(t *testing.T) {
err = pointB.Start() err = pointB.Start()
assert.Error(err).IsNil() assert.Error(err).IsNil()
data2SendBuffer := alloc.NewBuffer()
data2SendBuffer.Clear()
data2SendBuffer.Append([]byte(data2Send))
dest := v2net.NewUDPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}, 80)) dest := v2net.NewUDPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}, 80))
ich.Communicate(v2net.NewPacket(dest, []byte(data2Send), false)) ich.Communicate(v2net.NewPacket(dest, data2SendBuffer, false))
assert.Bytes([]byte(data2Send)).Equals(och.Data2Send.Bytes()) assert.Bytes([]byte(data2Send)).Equals(och.Data2Send.Bytes())
assert.Bytes(ich.DataReturned.Bytes()).Equals(och.Data2Return) assert.Bytes(ich.DataReturned.Bytes()).Equals(och.Data2Return)
} }

View File

@ -5,9 +5,9 @@ import (
"io" "io"
"net" "net"
"sync" "sync"
"time"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
@ -15,14 +15,6 @@ import (
"github.com/v2ray/v2ray-core/proxy/vmess/protocol/user" "github.com/v2ray/v2ray-core/proxy/vmess/protocol/user"
) )
const (
requestReadTimeOut = 4 * time.Second
)
var (
zeroTime time.Time
)
type VMessInboundHandler struct { type VMessInboundHandler struct {
vPoint *core.Point vPoint *core.Point
clients user.UserSet clients user.UserSet
@ -103,7 +95,7 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er
buffer = append(buffer, request.ResponseHeader...) buffer = append(buffer, request.ResponseHeader...)
if data, open := <-output; open { if data, open := <-output; open {
buffer = append(buffer, data...) buffer = append(buffer, data.Value...)
data = nil data = nil
responseWriter.Write(buffer) responseWriter.Write(buffer)
buffer = nil buffer = nil
@ -117,7 +109,7 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er
return nil return nil
} }
func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- []byte, finish *sync.Mutex) { func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {
defer close(input) defer close(input)
defer finish.Unlock() defer finish.Unlock()
@ -130,7 +122,7 @@ func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<-
v2net.ReaderToChan(input, requestReader) v2net.ReaderToChan(input, requestReader)
} }
func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan []byte, finish *sync.Mutex) { func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) {
v2net.ChanToWriter(writer, output) v2net.ChanToWriter(writer, output)
finish.Unlock() finish.Unlock()
} }

View File

@ -5,6 +5,7 @@ import (
"crypto/md5" "crypto/md5"
"net" "net"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
@ -55,14 +56,15 @@ func (handler *VMessInboundHandler) AcceptPackets(conn *net.UDPConn) {
continue continue
} }
data := make([]byte, bufferSize) data := alloc.NewBuffer()
nBytes, err = cryptReader.Read(data) nBytes, err = cryptReader.Read(data.Value)
if err != nil { if err != nil {
log.Warning("VMessIn: Unable to decrypt data: %v", err) log.Warning("VMessIn: Unable to decrypt data: %v", err)
continue continue
} }
data.Slice(0, nBytes)
packet := v2net.NewPacket(request.Destination(), data[:nBytes], false) packet := v2net.NewPacket(request.Destination(), data, false)
go handler.handlePacket(conn, request, packet, addr) go handler.handlePacket(conn, request, packet, addr)
} }
} }
@ -87,7 +89,9 @@ func (handler *VMessInboundHandler) handlePacket(conn *net.UDPConn, request *pro
if data, ok := <-ray.InboundOutput(); ok { if data, ok := <-ray.InboundOutput(); ok {
hasData = true hasData = true
responseWriter.Write(data) responseWriter.Write(data.Value)
data.Release()
data = nil
} }
if hasData { if hasData {

View File

@ -9,6 +9,7 @@ import (
"sync" "sync"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
@ -121,7 +122,7 @@ func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ra
return nil return nil
} }
func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2net.Packet, input <-chan []byte, finish *sync.Mutex) { func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2net.Packet, input <-chan *alloc.Buffer, finish *sync.Mutex) {
defer finish.Unlock() defer finish.Unlock()
encryptRequestWriter, err := v2io.NewAesEncryptWriter(request.RequestKey[:], request.RequestIV[:], conn) encryptRequestWriter, err := v2io.NewAesEncryptWriter(request.RequestKey[:], request.RequestIV[:], conn)
if err != nil { if err != nil {
@ -129,8 +130,9 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2
return return
} }
buffer := make([]byte, 0, 2*1024) buffer := alloc.NewBuffer()
buffer, err = request.ToBytes(user.NewTimeHash(user.HMACHash{}), user.GenerateRandomInt64InRange, buffer) buffer.Clear()
requestBytes, err := request.ToBytes(user.NewTimeHash(user.HMACHash{}), user.GenerateRandomInt64InRange, buffer.Value)
if err != nil { if err != nil {
log.Error("VMessOut: Failed to serialize VMess request: %v", err) log.Error("VMessOut: Failed to serialize VMess request: %v", err)
return return
@ -145,10 +147,14 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2
} }
if firstChunk != nil { if firstChunk != nil {
encryptRequestWriter.Crypt(firstChunk) encryptRequestWriter.Crypt(firstChunk.Value)
buffer = append(buffer, firstChunk...) requestBytes = append(requestBytes, firstChunk.Value...)
firstChunk.Release()
firstChunk = nil
_, err = conn.Write(buffer) _, err = conn.Write(requestBytes)
buffer.Release()
buffer = nil
if err != nil { if err != nil {
log.Error("VMessOut: Failed to write VMess request: %v", err) log.Error("VMessOut: Failed to write VMess request: %v", err)
return return
@ -161,7 +167,7 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2
return return
} }
func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<- []byte, finish *sync.Mutex, isUDP bool) { func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<- *alloc.Buffer, finish *sync.Mutex, isUDP bool) {
defer finish.Unlock() defer finish.Unlock()
defer close(output) defer close(output)
responseKey := md5.Sum(request.RequestKey[:]) responseKey := md5.Sum(request.RequestKey[:])
@ -173,18 +179,19 @@ func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<-
return return
} }
buffer, err := v2net.ReadFrom(decryptResponseReader, 4) buffer, err := v2net.ReadFrom(decryptResponseReader, nil)
if err != nil { if err != nil {
log.Error("VMessOut: Failed to read VMess response (%d bytes): %v", len(buffer), err) log.Error("VMessOut: Failed to read VMess response (%d bytes): %v", buffer.Len(), err)
return return
} }
if len(buffer) < 4 || !bytes.Equal(buffer[:4], request.ResponseHeader[:]) { if buffer.Len() < 4 || !bytes.Equal(buffer.Value[:4], request.ResponseHeader[:]) {
log.Warning("VMessOut: unexepcted response header. The connection is probably hijacked.") log.Warning("VMessOut: unexepcted response header. The connection is probably hijacked.")
return return
} }
log.Info("VMessOut received %d bytes from %s", len(buffer)-4, conn.RemoteAddr().String()) log.Info("VMessOut received %d bytes from %s", buffer.Len()-4, conn.RemoteAddr().String())
output <- buffer[4:] buffer.SliceFrom(4)
output <- buffer
if !isUDP { if !isUDP {
v2net.ReaderToChan(output, decryptResponseReader) v2net.ReaderToChan(output, decryptResponseReader)

28
ray.go
View File

@ -1,44 +1,48 @@
package core package core
import (
"github.com/v2ray/v2ray-core/common/alloc"
)
const ( const (
bufferSize = 16 bufferSize = 16
) )
// Ray is an internal tranport channel bewteen inbound and outbound connection. // Ray is an internal tranport channel bewteen inbound and outbound connection.
type Ray struct { type Ray struct {
Input chan []byte Input chan *alloc.Buffer
Output chan []byte Output chan *alloc.Buffer
} }
func NewRay() *Ray { func NewRay() *Ray {
return &Ray{ return &Ray{
Input: make(chan []byte, bufferSize), Input: make(chan *alloc.Buffer, bufferSize),
Output: make(chan []byte, bufferSize), Output: make(chan *alloc.Buffer, bufferSize),
} }
} }
type OutboundRay interface { type OutboundRay interface {
OutboundInput() <-chan []byte OutboundInput() <-chan *alloc.Buffer
OutboundOutput() chan<- []byte OutboundOutput() chan<- *alloc.Buffer
} }
type InboundRay interface { type InboundRay interface {
InboundInput() chan<- []byte InboundInput() chan<- *alloc.Buffer
InboundOutput() <-chan []byte InboundOutput() <-chan *alloc.Buffer
} }
func (ray *Ray) OutboundInput() <-chan []byte { func (ray *Ray) OutboundInput() <-chan *alloc.Buffer {
return ray.Input return ray.Input
} }
func (ray *Ray) OutboundOutput() chan<- []byte { func (ray *Ray) OutboundOutput() chan<- *alloc.Buffer {
return ray.Output return ray.Output
} }
func (ray *Ray) InboundInput() chan<- []byte { func (ray *Ray) InboundInput() chan<- *alloc.Buffer {
return ray.Input return ray.Input
} }
func (ray *Ray) InboundOutput() <-chan []byte { func (ray *Ray) InboundOutput() <-chan *alloc.Buffer {
return ray.Output return ray.Output
} }

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
) )
@ -25,7 +26,10 @@ func (handler *InboundConnectionHandler) Communicate(packet v2net.Packet) error
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()
input <- handler.Data2Send buffer := alloc.NewBuffer()
buffer.Clear()
buffer.Append(handler.Data2Send)
input <- buffer
close(input) close(input)
v2net.ChanToWriter(handler.DataReturned, output) v2net.ChanToWriter(handler.DataReturned, output)

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/alloc"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
) )
@ -19,7 +20,7 @@ func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core
handler.Destination = packet.Destination() handler.Destination = packet.Destination()
if packet.Chunk() != nil { if packet.Chunk() != nil {
handler.Data2Send.Write(packet.Chunk()) handler.Data2Send.Write(packet.Chunk().Value)
} }
go func() { go func() {
@ -28,11 +29,13 @@ func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core
if !open { if !open {
break break
} }
handler.Data2Send.Write(data) handler.Data2Send.Write(data.Value)
data.Release()
} }
dataCopy := make([]byte, len(handler.Data2Return)) response := alloc.NewBuffer()
copy(dataCopy, handler.Data2Return) response.Clear()
output <- dataCopy response.Append(handler.Data2Return)
output <- response
close(output) close(output)
}() }()