1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-01 23:16:23 -05:00

Remove v2net.Packet

This commit is contained in:
v2ray 2016-04-26 00:13:26 +02:00
parent 7db14dad9b
commit 42907ff2e8
19 changed files with 119 additions and 216 deletions

View File

@ -12,11 +12,11 @@ const (
// PacketDispatcher dispatch a packet and possibly further network payload to its destination. // PacketDispatcher dispatch a packet and possibly further network payload to its destination.
type PacketDispatcher interface { type PacketDispatcher interface {
DispatchToOutbound(packet v2net.Packet) ray.InboundRay DispatchToOutbound(destination v2net.Destination) ray.InboundRay
} }
type packetDispatcherWithContext interface { type packetDispatcherWithContext interface {
DispatchToOutbound(context app.Context, packet v2net.Packet) ray.InboundRay DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay
} }
type contextedPacketDispatcher struct { type contextedPacketDispatcher struct {
@ -24,8 +24,8 @@ type contextedPacketDispatcher struct {
packetDispatcher packetDispatcherWithContext packetDispatcher packetDispatcherWithContext
} }
func (this *contextedPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay { func (this *contextedPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay {
return this.packetDispatcher.DispatchToOutbound(this.context, packet) return this.packetDispatcher.DispatchToOutbound(this.context, destination)
} }
func init() { func init() {

View File

@ -7,12 +7,12 @@ import (
type TestPacketDispatcher struct { type TestPacketDispatcher struct {
Destination chan v2net.Destination Destination chan v2net.Destination
Handler func(packet v2net.Packet, traffic ray.OutboundRay) Handler func(destination v2net.Destination, traffic ray.OutboundRay)
} }
func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.OutboundRay)) *TestPacketDispatcher { func NewTestPacketDispatcher(handler func(destination v2net.Destination, traffic ray.OutboundRay)) *TestPacketDispatcher {
if handler == nil { if handler == nil {
handler = func(packet v2net.Packet, traffic ray.OutboundRay) { handler = func(destination v2net.Destination, traffic ray.OutboundRay) {
for { for {
payload, err := traffic.OutboundInput().Read() payload, err := traffic.OutboundInput().Read()
if err != nil { if err != nil {
@ -29,10 +29,10 @@ func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.Outbo
} }
} }
func (this *TestPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay { func (this *TestPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay {
traffic := ray.NewRay() traffic := ray.NewRay()
this.Destination <- packet.Destination() this.Destination <- destination
go this.Handler(packet, traffic) go this.Handler(destination, traffic)
return traffic return traffic
} }

View File

@ -1,47 +0,0 @@
package net
import (
"github.com/v2ray/v2ray-core/common"
"github.com/v2ray/v2ray-core/common/alloc"
)
// Packet is a network packet to be sent to destination.
type Packet interface {
common.Releasable
Destination() Destination
Chunk() *alloc.Buffer // First chunk of this commnunication
MoreChunks() bool
}
// NewPacket creates a new Packet with given destination and payload.
func NewPacket(dest Destination, firstChunk *alloc.Buffer, moreChunks bool) Packet {
return &packetImpl{
dest: dest,
data: firstChunk,
moreData: moreChunks,
}
}
type packetImpl struct {
dest Destination
data *alloc.Buffer
moreData bool
}
func (packet *packetImpl) Destination() Destination {
return packet.dest
}
func (packet *packetImpl) Chunk() *alloc.Buffer {
return packet.data
}
func (packet *packetImpl) MoreChunks() bool {
return packet.moreData
}
func (packet *packetImpl) Release() {
packet.data.Release()
packet.data = nil
}

View File

@ -2,6 +2,7 @@ package blackhole
import ( import (
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/common/alloc"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/proxy/internal" "github.com/v2ray/v2ray-core/proxy/internal"
@ -16,8 +17,8 @@ func NewBlackHole() *BlackHole {
return &BlackHole{} return &BlackHole{}
} }
func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { func (this *BlackHole) Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error {
firstPacket.Release() payload.Release()
ray.OutboundOutput().Close() ray.OutboundOutput().Close()
ray.OutboundOutput().Release() ray.OutboundOutput().Release()

View File

@ -95,16 +95,14 @@ func (this *DokodemoDoor) ListenUDP(port v2net.Port) error {
} }
func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) { func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) {
packet := v2net.NewPacket(v2net.UDPDestination(this.address, this.port), payload, false) this.udpServer.Dispatch(dest, v2net.UDPDestination(this.address, this.port), payload, func(destination v2net.Destination, payload *alloc.Buffer) {
this.udpServer.Dispatch(dest, packet, func(packet v2net.Packet) { defer payload.Release()
defer packet.Chunk().Release()
this.udpMutex.RLock() this.udpMutex.RLock()
defer this.udpMutex.RUnlock()
if !this.accepting { if !this.accepting {
this.udpMutex.RUnlock()
return return
} }
this.udpHub.WriteTo(packet.Chunk().Value, packet.Destination()) this.udpHub.WriteTo(payload.Value, destination)
this.udpMutex.RUnlock()
}) })
} }
@ -123,8 +121,7 @@ func (this *DokodemoDoor) ListenTCP(port v2net.Port) error {
func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) { func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) {
defer conn.Close() defer conn.Close()
packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true) ray := this.packetDispatcher.DispatchToOutbound(v2net.TCPDestination(this.address, this.port))
ray := this.packetDispatcher.DispatchToOutbound(packet)
defer ray.InboundOutput().Release() defer ray.InboundOutput().Release()
var inputFinish, outputFinish sync.Mutex var inputFinish, outputFinish sync.Mutex

View File

@ -5,6 +5,7 @@ import (
"net" "net"
"sync" "sync"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
@ -16,15 +17,15 @@ import (
type FreedomConnection struct { type FreedomConnection struct {
} }
func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error {
log.Info("Freedom: Opening connection to ", firstPacket.Destination()) log.Info("Freedom: Opening connection to ", destination)
defer firstPacket.Release() defer payload.Release()
defer ray.OutboundInput().Release() defer ray.OutboundInput().Release()
var conn net.Conn var conn net.Conn
err := retry.Timed(5, 100).On(func() error { err := retry.Timed(5, 100).On(func() error {
rawConn, err := dialer.Dial(firstPacket.Destination()) rawConn, err := dialer.Dial(destination)
if err != nil { if err != nil {
return err return err
} }
@ -32,7 +33,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
return nil return nil
}) })
if err != nil { if err != nil {
log.Error("Freedom: Failed to open connection to ", firstPacket.Destination(), ": ", err) log.Error("Freedom: Failed to open connection to ", destination, ": ", err)
return err return err
} }
defer conn.Close() defer conn.Close()
@ -43,21 +44,15 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
readMutex.Lock() readMutex.Lock()
writeMutex.Lock() writeMutex.Lock()
if chunk := firstPacket.Chunk(); chunk != nil { conn.Write(payload.Value)
conn.Write(chunk.Value)
}
if !firstPacket.MoreChunks() { go func() {
v2writer := v2io.NewAdaptiveWriter(conn)
defer v2writer.Release()
v2io.Pipe(input, v2writer)
writeMutex.Unlock() writeMutex.Unlock()
} else { }()
go func() {
v2writer := v2io.NewAdaptiveWriter(conn)
defer v2writer.Release()
v2io.Pipe(input, v2writer)
writeMutex.Unlock()
}()
}
go func() { go func() {
defer readMutex.Unlock() defer readMutex.Unlock()
@ -65,7 +60,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
var reader io.Reader = conn var reader io.Reader = conn
if firstPacket.Destination().IsUDP() { if destination.IsUDP() {
reader = v2net.NewTimeOutReader(16 /* seconds */, conn) reader = v2net.NewTimeOutReader(16 /* seconds */, conn)
} }

View File

@ -33,10 +33,8 @@ func TestSinglePacket(t *testing.T) {
traffic := ray.NewRay() traffic := ray.NewRay()
data2Send := "Data to be sent to remote" data2Send := "Data to be sent to remote"
payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send)) payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send))
packet := v2net.NewPacket(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), port), payload, false)
err = freedom.Dispatch(packet, traffic) go freedom.Dispatch(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), port), payload, traffic)
assert.Error(err).IsNil()
traffic.InboundInput().Close() traffic.InboundInput().Close()
respPayload, err := traffic.InboundOutput().Read() respPayload, err := traffic.InboundOutput().Read()
@ -53,8 +51,7 @@ func TestUnreachableDestination(t *testing.T) {
traffic := ray.NewRay() traffic := ray.NewRay()
data2Send := "Data to be sent to remote" data2Send := "Data to be sent to remote"
payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send)) payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send))
packet := v2net.NewPacket(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), 128), payload, false)
err := freedom.Dispatch(packet, traffic) err := freedom.Dispatch(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), 128), payload, traffic)
assert.Error(err).IsNotNil() assert.Error(err).IsNotNil()
} }

View File

@ -143,8 +143,7 @@ func (this *HttpProxyServer) handleConnect(request *http.Request, destination v2
writer.Write(buffer.Value) writer.Write(buffer.Value)
buffer.Release() buffer.Release()
packet := v2net.NewPacket(destination, nil, true) ray := this.packetDispatcher.DispatchToOutbound(destination)
ray := this.packetDispatcher.DispatchToOutbound(packet)
this.transport(reader, writer, ray) this.transport(reader, writer, ray)
} }
@ -227,8 +226,8 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D
request.Write(requestBuffer) request.Write(requestBuffer)
log.Debug("Request to remote:\n", serial.BytesLiteral(requestBuffer.Value)) log.Debug("Request to remote:\n", serial.BytesLiteral(requestBuffer.Value))
packet := v2net.NewPacket(dest, requestBuffer, true) ray := this.packetDispatcher.DispatchToOutbound(dest)
ray := this.packetDispatcher.DispatchToOutbound(packet) ray.InboundInput().Write(requestBuffer)
defer ray.InboundInput().Close() defer ray.InboundInput().Close()
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -2,6 +2,7 @@
package proxy // import "github.com/v2ray/v2ray-core/proxy" package proxy // import "github.com/v2ray/v2ray-core/proxy"
import ( import (
"github.com/v2ray/v2ray-core/common/alloc"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport/ray" "github.com/v2ray/v2ray-core/transport/ray"
) )
@ -26,5 +27,5 @@ type InboundHandler interface {
// An OutboundHandler handles outbound network connection for V2Ray. // An OutboundHandler handles outbound network connection for V2Ray.
type OutboundHandler interface { type OutboundHandler interface {
// Dispatch sends one or more Packets to its destination. // Dispatch sends one or more Packets to its destination.
Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error
} }

View File

@ -115,9 +115,8 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D
log.Access(source, dest, log.AccessAccepted, serial.StringLiteral("")) log.Access(source, dest, log.AccessAccepted, serial.StringLiteral(""))
log.Info("Shadowsocks: Tunnelling request to ", dest) log.Info("Shadowsocks: Tunnelling request to ", dest)
packet := v2net.NewPacket(dest, request.UDPPayload, false) this.udpServer.Dispatch(source, dest, request.UDPPayload, func(destination v2net.Destination, payload *alloc.Buffer) {
this.udpServer.Dispatch(source, packet, func(packet v2net.Packet) { defer payload.Release()
defer packet.Chunk().Release()
response := alloc.NewBuffer().Slice(0, ivLen) response := alloc.NewBuffer().Slice(0, ivLen)
defer response.Release() defer response.Release()
@ -146,7 +145,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D
} }
writer.Write(request.Port.Bytes()) writer.Write(request.Port.Bytes())
writer.Write(packet.Chunk().Value) writer.Write(payload.Value)
if request.OTA { if request.OTA {
respAuth := NewAuthenticator(HeaderKeyGenerator(key, respIv)) respAuth := NewAuthenticator(HeaderKeyGenerator(key, respIv))
@ -198,8 +197,7 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, serial.StringLiteral("")) log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, serial.StringLiteral(""))
log.Info("Shadowsocks: Tunnelling request to ", dest) log.Info("Shadowsocks: Tunnelling request to ", dest)
packet := v2net.NewPacket(dest, nil, true) ray := this.packetDispatcher.DispatchToOutbound(dest)
ray := this.packetDispatcher.DispatchToOutbound(packet)
var writeFinish sync.Mutex var writeFinish sync.Mutex
writeFinish.Lock() writeFinish.Lock()

View File

@ -206,8 +206,7 @@ func (this *SocksServer) handleSocks5(reader *v2io.BufferedReader, writer *v2io.
dest := request.Destination() dest := request.Destination()
log.Info("Socks: TCP Connect request to ", dest) log.Info("Socks: TCP Connect request to ", dest)
packet := v2net.NewPacket(dest, nil, true) this.transport(reader, writer, dest)
this.transport(reader, writer, packet)
return nil return nil
} }
@ -261,13 +260,12 @@ func (this *SocksServer) handleSocks4(reader *v2io.BufferedReader, writer *v2io.
writer.SetCached(false) writer.SetCached(false)
dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port) dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port)
packet := v2net.NewPacket(dest, nil, true) this.transport(reader, writer, dest)
this.transport(reader, writer, packet)
return nil return nil
} }
func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPacket v2net.Packet) { func (this *SocksServer) transport(reader io.Reader, writer io.Writer, destination v2net.Destination) {
ray := this.packetDispatcher.DispatchToOutbound(firstPacket) ray := this.packetDispatcher.DispatchToOutbound(destination)
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()

View File

@ -42,16 +42,15 @@ func (this *SocksServer) handleUDPPayload(payload *alloc.Buffer, source v2net.De
return return
} }
udpPacket := v2net.NewPacket(request.Destination(), request.Data, false) log.Info("Socks: Send packet to ", request.Destination(), " with ", request.Data.Len(), " bytes")
log.Info("Socks: Send packet to ", udpPacket.Destination(), " with ", request.Data.Len(), " bytes") this.udpServer.Dispatch(source, request.Destination(), request.Data, func(destination v2net.Destination, payload *alloc.Buffer) {
this.udpServer.Dispatch(source, udpPacket, func(packet v2net.Packet) {
response := &protocol.Socks5UDPRequest{ response := &protocol.Socks5UDPRequest{
Fragment: 0, Fragment: 0,
Address: udpPacket.Destination().Address(), Address: request.Destination().Address(),
Port: udpPacket.Destination().Port(), Port: request.Destination().Port(),
Data: packet.Chunk(), Data: payload,
} }
log.Info("Socks: Writing back UDP response with ", response.Data.Len(), " bytes to ", packet.Destination()) log.Info("Socks: Writing back UDP response with ", payload.Len(), " bytes to ", destination)
udpMessage := alloc.NewSmallBuffer().Clear() udpMessage := alloc.NewSmallBuffer().Clear()
response.Write(udpMessage) response.Write(udpMessage)
@ -61,12 +60,12 @@ func (this *SocksServer) handleUDPPayload(payload *alloc.Buffer, source v2net.De
this.udpMutex.RUnlock() this.udpMutex.RUnlock()
return return
} }
nBytes, err := this.udpHub.WriteTo(udpMessage.Value, packet.Destination()) nBytes, err := this.udpHub.WriteTo(udpMessage.Value, destination)
this.udpMutex.RUnlock() this.udpMutex.RUnlock()
udpMessage.Release() udpMessage.Release()
response.Data.Release() response.Data.Release()
if err != nil { if err != nil {
log.Error("Socks: failed to write UDP message (", nBytes, " bytes) to ", packet.Destination(), ": ", err) log.Error("Socks: failed to write UDP message (", nBytes, " bytes) to ", destination, ": ", err)
} }
}) })
} }

View File

@ -29,8 +29,8 @@ func (this *InboundConnectionHandler) Close() {
} }
func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error { func (this *InboundConnectionHandler) Communicate(destination v2net.Destination) error {
ray := this.PacketDispatcher.DispatchToOutbound(packet) ray := this.PacketDispatcher.DispatchToOutbound(destination)
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy"
@ -17,32 +18,28 @@ type OutboundConnectionHandler struct {
ConnOutput io.Writer ConnOutput io.Writer
} }
func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.OutboundRay) error { func (this *OutboundConnectionHandler) Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error {
input := ray.OutboundInput() input := ray.OutboundInput()
output := ray.OutboundOutput() output := ray.OutboundOutput()
this.Destination = packet.Destination() this.Destination = destination
if packet.Chunk() != nil { this.ConnOutput.Write(payload.Value)
this.ConnOutput.Write(packet.Chunk().Value) payload.Release()
packet.Chunk().Release()
}
if packet.MoreChunks() { writeFinish := &sync.Mutex{}
writeFinish := &sync.Mutex{}
writeFinish.Lock() writeFinish.Lock()
go func() { go func() {
v2writer := v2io.NewAdaptiveWriter(this.ConnOutput) v2writer := v2io.NewAdaptiveWriter(this.ConnOutput)
defer v2writer.Release() defer v2writer.Release()
v2io.Pipe(input, v2writer) v2io.Pipe(input, v2writer)
writeFinish.Unlock() writeFinish.Unlock()
input.Release() input.Release()
}() }()
writeFinish.Lock() writeFinish.Lock()
}
v2reader := v2io.NewAdaptiveReader(this.ConnInput) v2reader := v2io.NewAdaptiveReader(this.ConnInput)
defer v2reader.Release() defer v2reader.Release()

View File

@ -135,7 +135,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, serial.StringLiteral("")) log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, serial.StringLiteral(""))
log.Debug("VMessIn: Received request for ", request.Destination()) log.Debug("VMessIn: Received request for ", request.Destination())
ray := this.packetDispatcher.DispatchToOutbound(v2net.NewPacket(request.Destination(), nil, true)) ray := this.packetDispatcher.DispatchToOutbound(request.Destination())
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()
var readFinish, writeFinish sync.Mutex var readFinish, writeFinish sync.Mutex

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
@ -13,6 +14,7 @@ import (
"github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/proxy/internal" "github.com/v2ray/v2ray-core/proxy/internal"
vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
"github.com/v2ray/v2ray-core/transport/dialer"
"github.com/v2ray/v2ray-core/transport/ray" "github.com/v2ray/v2ray-core/transport/ray"
) )
@ -20,50 +22,33 @@ type VMessOutboundHandler struct {
receiverManager *ReceiverManager receiverManager *ReceiverManager
} }
func (this *VMessOutboundHandler) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error {
vNextAddress, vNextUser := this.receiverManager.PickReceiver() destination, vNextUser := this.receiverManager.PickReceiver()
command := proto.RequestCommandTCP command := proto.RequestCommandTCP
if firstPacket.Destination().IsUDP() { if target.IsUDP() {
command = proto.RequestCommandUDP command = proto.RequestCommandUDP
} }
request := &proto.RequestHeader{ request := &proto.RequestHeader{
Version: raw.Version, Version: raw.Version,
User: vNextUser, User: vNextUser,
Command: command, Command: command,
Address: firstPacket.Destination().Address(), Address: target.Address(),
Port: firstPacket.Destination().Port(), Port: target.Port(),
} }
if command == proto.RequestCommandUDP { if command == proto.RequestCommandUDP {
request.Option |= proto.RequestOptionChunkStream request.Option |= proto.RequestOptionChunkStream
} }
return this.startCommunicate(request, vNextAddress, ray, firstPacket) conn, err := dialer.Dial(destination)
}
func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader, dest v2net.Destination, ray ray.OutboundRay, firstPacket v2net.Packet) error {
var destIP net.IP
if dest.Address().IsIPv4() || dest.Address().IsIPv6() {
destIP = dest.Address().IP()
} else {
ips, err := net.LookupIP(dest.Address().Domain())
if err != nil {
return err
}
destIP = ips[0]
}
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
IP: destIP,
Port: int(dest.Port()),
})
if err != nil { if err != nil {
log.Error("Failed to open ", dest, ": ", err) log.Error("Failed to open ", destination, ": ", err)
if ray != nil { if ray != nil {
ray.OutboundOutput().Close() ray.OutboundOutput().Close()
} }
return err return err
} }
log.Info("VMessOut: Tunneling request to ", request.Address, " via ", dest) log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination)
defer conn.Close() defer conn.Close()
@ -76,46 +61,43 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader,
session := raw.NewClientSession(proto.DefaultIDHash) session := raw.NewClientSession(proto.DefaultIDHash)
go this.handleRequest(session, conn, request, firstPacket, input, &requestFinish) go this.handleRequest(session, conn, request, payload, input, &requestFinish)
go this.handleResponse(session, conn, request, dest, output, &responseFinish) go this.handleResponse(session, conn, request, destination, output, &responseFinish)
requestFinish.Lock() requestFinish.Lock()
conn.CloseWrite() if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.CloseWrite()
}
responseFinish.Lock() responseFinish.Lock()
output.Close() output.Close()
input.Release() input.Release()
return nil return nil
} }
func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input v2io.Reader, finish *sync.Mutex) { func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
defer finish.Unlock() defer finish.Unlock()
defer payload.Release()
writer := v2io.NewBufferedWriter(conn) writer := v2io.NewBufferedWriter(conn)
defer writer.Release() defer writer.Release()
session.EncodeRequestHeader(request, writer) session.EncodeRequestHeader(request, writer)
// Send first packet of payload together with request, in favor of small requests.
firstChunk := firstPacket.Chunk()
moreChunks := firstPacket.MoreChunks()
if request.Option.IsChunkStream() { if request.Option.IsChunkStream() {
vmessio.Authenticate(firstChunk) vmessio.Authenticate(payload)
} }
bodyWriter := session.EncodeRequestBody(writer) bodyWriter := session.EncodeRequestBody(writer)
bodyWriter.Write(firstChunk.Value) bodyWriter.Write(payload.Value)
firstChunk.Release()
writer.SetCached(false) writer.SetCached(false)
if moreChunks { var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) if request.Option.IsChunkStream() {
if request.Option.IsChunkStream() { streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
}
v2io.Pipe(input, streamWriter)
streamWriter.Release()
} }
v2io.Pipe(input, streamWriter)
streamWriter.Release()
return return
} }

View File

@ -107,7 +107,7 @@ func TestVMessInAndOut(t *testing.T) {
assert.Error(err).IsNil() assert.Error(err).IsNil()
dest := v2net.TCPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}), 80) dest := v2net.TCPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}), 80)
ich.Communicate(v2net.NewPacket(dest, nil, true)) ich.Communicate(dest)
assert.Bytes(ichConnInput).Equals(ochConnOutput.Bytes()) assert.Bytes(ichConnInput).Equals(ochConnOutput.Bytes())
assert.Bytes(ichConnOutput.Bytes()).Equals(ochConnInput) assert.Bytes(ichConnOutput.Bytes()).Equals(ochConnInput)
} }

View File

@ -173,16 +173,14 @@ func (this *Point) Start() error {
// Dispatches a Packet to an OutboundConnection. // Dispatches a Packet to an OutboundConnection.
// The packet will be passed through the router (if configured), and then sent to an outbound // The packet will be passed through the router (if configured), and then sent to an outbound
// connection with matching tag. // connection with matching tag.
func (this *Point) DispatchToOutbound(context app.Context, packet v2net.Packet) ray.InboundRay { func (this *Point) DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay {
direct := ray.NewRay() direct := ray.NewRay()
dest := packet.Destination()
dispatcher := this.och dispatcher := this.och
if this.router != nil { if this.router != nil {
if tag, err := this.router.TakeDetour(dest); err == nil { if tag, err := this.router.TakeDetour(destination); err == nil {
if handler, found := this.odh[tag]; found { if handler, found := this.odh[tag]; found {
log.Info("Point: Taking detour [", tag, "] for [", dest, "]", tag, dest) log.Info("Point: Taking detour [", tag, "] for [", destination, "]")
dispatcher = handler dispatcher = handler
} else { } else {
log.Warning("Point: Unable to find routing destination: ", tag) log.Warning("Point: Unable to find routing destination: ", tag)
@ -190,34 +188,19 @@ func (this *Point) DispatchToOutbound(context app.Context, packet v2net.Packet)
} }
} }
go this.FilterPacketAndDispatch(packet, direct, dispatcher) go this.FilterPacketAndDispatch(destination, direct, dispatcher)
return direct return direct
} }
func (this *Point) FilterPacketAndDispatch(packet v2net.Packet, link ray.OutboundRay, dispatcher proxy.OutboundHandler) { func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
// Filter empty packets payload, err := link.OutboundInput().Read()
chunk := packet.Chunk() if err != nil {
moreChunks := packet.MoreChunks()
changed := false
var err error
for chunk == nil && moreChunks {
changed = true
chunk, err = link.OutboundInput().Read()
if err != nil {
moreChunks = false
}
}
if chunk == nil && !moreChunks {
log.Info("Point: No payload to dispatch, stopping dispatching now.") log.Info("Point: No payload to dispatch, stopping dispatching now.")
link.OutboundOutput().Close() link.OutboundOutput().Close()
link.OutboundInput().Release()
return return
} }
dispatcher.Dispatch(destination, payload, link)
if changed {
packet = v2net.NewPacket(packet.Destination(), chunk, moreChunks)
}
dispatcher.Dispatch(packet, link)
} }
func (this *Point) GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) { func (this *Point) GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) {

View File

@ -4,11 +4,12 @@ import (
"sync" "sync"
"github.com/v2ray/v2ray-core/app/dispatcher" "github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/common/alloc"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport/ray" "github.com/v2ray/v2ray-core/transport/ray"
) )
type UDPResponseCallback func(packet v2net.Packet) type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
type connEntry struct { type connEntry struct {
inboundRay ray.InboundRay inboundRay ray.InboundRay
@ -28,24 +29,26 @@ func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
} }
} }
func (this *UDPServer) locateExistingAndDispatch(dest string, packet v2net.Packet) bool { func (this *UDPServer) locateExistingAndDispatch(dest string, payload *alloc.Buffer) bool {
this.RLock() this.RLock()
defer this.RUnlock() defer this.RUnlock()
if entry, found := this.conns[dest]; found { if entry, found := this.conns[dest]; found {
entry.inboundRay.InboundInput().Write(packet.Chunk()) entry.inboundRay.InboundInput().Write(payload)
return true return true
} }
return false return false
} }
func (this *UDPServer) Dispatch(source v2net.Destination, packet v2net.Packet, callback UDPResponseCallback) { func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
destString := source.String() + "-" + packet.Destination().NetAddr() destString := source.String() + "-" + destination.NetAddr()
if this.locateExistingAndDispatch(destString, packet) { if this.locateExistingAndDispatch(destString, payload) {
return return
} }
this.Lock() this.Lock()
inboundRay := this.packetDispatcher.DispatchToOutbound(v2net.NewPacket(packet.Destination(), packet.Chunk(), true)) inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
inboundRay.InboundInput().Write(payload)
this.conns[destString] = &connEntry{ this.conns[destString] = &connEntry{
inboundRay: inboundRay, inboundRay: inboundRay,
callback: callback, callback: callback,
@ -60,7 +63,7 @@ func (this *UDPServer) handleConnection(destString string, inboundRay ray.Inboun
if err != nil { if err != nil {
break break
} }
callback(v2net.NewPacket(source, data, false)) callback(source, data)
} }
this.Lock() this.Lock()
delete(this.conns, destString) delete(this.conns, destString)