From 42907ff2e8b11d5ed56275ae47e6befb38629d71 Mon Sep 17 00:00:00 2001 From: v2ray Date: Tue, 26 Apr 2016 00:13:26 +0200 Subject: [PATCH] Remove v2net.Packet --- app/dispatcher/dispatcher.go | 8 +-- app/dispatcher/testing/dispatcher.go | 12 ++--- common/net/packet.go | 47 ------------------ proxy/blackhole/blackhole.go | 5 +- proxy/dokodemo/dokodemo.go | 13 ++--- proxy/freedom/freedom.go | 33 ++++++------- proxy/freedom/freedom_test.go | 7 +-- proxy/http/http.go | 7 ++- proxy/proxy.go | 3 +- proxy/shadowsocks/shadowsocks.go | 10 ++-- proxy/socks/socks.go | 10 ++-- proxy/socks/udp.go | 17 +++---- proxy/testing/mocks/inboundhandler.go | 4 +- proxy/testing/mocks/outboundhandler.go | 33 ++++++------- proxy/vmess/inbound/inbound.go | 2 +- proxy/vmess/outbound/outbound.go | 68 ++++++++++---------------- proxy/vmess/vmess_test.go | 2 +- shell/point/point.go | 35 ++++--------- transport/hub/udp_server.go | 19 ++++--- 19 files changed, 119 insertions(+), 216 deletions(-) delete mode 100644 common/net/packet.go diff --git a/app/dispatcher/dispatcher.go b/app/dispatcher/dispatcher.go index 9ea9e461d..b1103e0a0 100644 --- a/app/dispatcher/dispatcher.go +++ b/app/dispatcher/dispatcher.go @@ -12,11 +12,11 @@ const ( // PacketDispatcher dispatch a packet and possibly further network payload to its destination. type PacketDispatcher interface { - DispatchToOutbound(packet v2net.Packet) ray.InboundRay + DispatchToOutbound(destination v2net.Destination) ray.InboundRay } type packetDispatcherWithContext interface { - DispatchToOutbound(context app.Context, packet v2net.Packet) ray.InboundRay + DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay } type contextedPacketDispatcher struct { @@ -24,8 +24,8 @@ type contextedPacketDispatcher struct { packetDispatcher packetDispatcherWithContext } -func (this *contextedPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay { - return this.packetDispatcher.DispatchToOutbound(this.context, packet) +func (this *contextedPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay { + return this.packetDispatcher.DispatchToOutbound(this.context, destination) } func init() { diff --git a/app/dispatcher/testing/dispatcher.go b/app/dispatcher/testing/dispatcher.go index 532e93fb2..2ae1f1a03 100644 --- a/app/dispatcher/testing/dispatcher.go +++ b/app/dispatcher/testing/dispatcher.go @@ -7,12 +7,12 @@ import ( type TestPacketDispatcher struct { Destination chan v2net.Destination - Handler func(packet v2net.Packet, traffic ray.OutboundRay) + Handler func(destination v2net.Destination, traffic ray.OutboundRay) } -func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.OutboundRay)) *TestPacketDispatcher { +func NewTestPacketDispatcher(handler func(destination v2net.Destination, traffic ray.OutboundRay)) *TestPacketDispatcher { if handler == nil { - handler = func(packet v2net.Packet, traffic ray.OutboundRay) { + handler = func(destination v2net.Destination, traffic ray.OutboundRay) { for { payload, err := traffic.OutboundInput().Read() if err != nil { @@ -29,10 +29,10 @@ func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.Outbo } } -func (this *TestPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay { +func (this *TestPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay { traffic := ray.NewRay() - this.Destination <- packet.Destination() - go this.Handler(packet, traffic) + this.Destination <- destination + go this.Handler(destination, traffic) return traffic } diff --git a/common/net/packet.go b/common/net/packet.go deleted file mode 100644 index 721548243..000000000 --- a/common/net/packet.go +++ /dev/null @@ -1,47 +0,0 @@ -package net - -import ( - "github.com/v2ray/v2ray-core/common" - "github.com/v2ray/v2ray-core/common/alloc" -) - -// Packet is a network packet to be sent to destination. -type Packet interface { - common.Releasable - - Destination() Destination - Chunk() *alloc.Buffer // First chunk of this commnunication - MoreChunks() bool -} - -// NewPacket creates a new Packet with given destination and payload. -func NewPacket(dest Destination, firstChunk *alloc.Buffer, moreChunks bool) Packet { - return &packetImpl{ - dest: dest, - data: firstChunk, - moreData: moreChunks, - } -} - -type packetImpl struct { - dest Destination - data *alloc.Buffer - moreData bool -} - -func (packet *packetImpl) Destination() Destination { - return packet.dest -} - -func (packet *packetImpl) Chunk() *alloc.Buffer { - return packet.data -} - -func (packet *packetImpl) MoreChunks() bool { - return packet.moreData -} - -func (packet *packetImpl) Release() { - packet.data.Release() - packet.data = nil -} diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index 48e371213..95afd5b54 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -2,6 +2,7 @@ package blackhole import ( "github.com/v2ray/v2ray-core/app" + "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" @@ -16,8 +17,8 @@ func NewBlackHole() *BlackHole { return &BlackHole{} } -func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { - firstPacket.Release() +func (this *BlackHole) Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error { + payload.Release() ray.OutboundOutput().Close() ray.OutboundOutput().Release() diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 971ed4a19..3b3ea0db8 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -95,16 +95,14 @@ func (this *DokodemoDoor) ListenUDP(port v2net.Port) error { } func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) { - packet := v2net.NewPacket(v2net.UDPDestination(this.address, this.port), payload, false) - this.udpServer.Dispatch(dest, packet, func(packet v2net.Packet) { - defer packet.Chunk().Release() + this.udpServer.Dispatch(dest, v2net.UDPDestination(this.address, this.port), payload, func(destination v2net.Destination, payload *alloc.Buffer) { + defer payload.Release() this.udpMutex.RLock() + defer this.udpMutex.RUnlock() if !this.accepting { - this.udpMutex.RUnlock() return } - this.udpHub.WriteTo(packet.Chunk().Value, packet.Destination()) - this.udpMutex.RUnlock() + this.udpHub.WriteTo(payload.Value, destination) }) } @@ -123,8 +121,7 @@ func (this *DokodemoDoor) ListenTCP(port v2net.Port) error { func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) { defer conn.Close() - packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true) - ray := this.packetDispatcher.DispatchToOutbound(packet) + ray := this.packetDispatcher.DispatchToOutbound(v2net.TCPDestination(this.address, this.port)) defer ray.InboundOutput().Release() var inputFinish, outputFinish sync.Mutex diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index f4b7069fb..bcd6cad47 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -5,6 +5,7 @@ import ( "net" "sync" + "github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -16,15 +17,15 @@ import ( type FreedomConnection struct { } -func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { - log.Info("Freedom: Opening connection to ", firstPacket.Destination()) +func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error { + log.Info("Freedom: Opening connection to ", destination) - defer firstPacket.Release() + defer payload.Release() defer ray.OutboundInput().Release() var conn net.Conn err := retry.Timed(5, 100).On(func() error { - rawConn, err := dialer.Dial(firstPacket.Destination()) + rawConn, err := dialer.Dial(destination) if err != nil { return err } @@ -32,7 +33,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou return nil }) if err != nil { - log.Error("Freedom: Failed to open connection to ", firstPacket.Destination(), ": ", err) + log.Error("Freedom: Failed to open connection to ", destination, ": ", err) return err } defer conn.Close() @@ -43,21 +44,15 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou readMutex.Lock() writeMutex.Lock() - if chunk := firstPacket.Chunk(); chunk != nil { - conn.Write(chunk.Value) - } + conn.Write(payload.Value) - if !firstPacket.MoreChunks() { + go func() { + v2writer := v2io.NewAdaptiveWriter(conn) + defer v2writer.Release() + + v2io.Pipe(input, v2writer) writeMutex.Unlock() - } else { - go func() { - v2writer := v2io.NewAdaptiveWriter(conn) - defer v2writer.Release() - - v2io.Pipe(input, v2writer) - writeMutex.Unlock() - }() - } + }() go func() { defer readMutex.Unlock() @@ -65,7 +60,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou var reader io.Reader = conn - if firstPacket.Destination().IsUDP() { + if destination.IsUDP() { reader = v2net.NewTimeOutReader(16 /* seconds */, conn) } diff --git a/proxy/freedom/freedom_test.go b/proxy/freedom/freedom_test.go index 2ed95db88..1838ca56b 100644 --- a/proxy/freedom/freedom_test.go +++ b/proxy/freedom/freedom_test.go @@ -33,10 +33,8 @@ func TestSinglePacket(t *testing.T) { traffic := ray.NewRay() data2Send := "Data to be sent to remote" payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send)) - packet := v2net.NewPacket(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), port), payload, false) - err = freedom.Dispatch(packet, traffic) - assert.Error(err).IsNil() + go freedom.Dispatch(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), port), payload, traffic) traffic.InboundInput().Close() respPayload, err := traffic.InboundOutput().Read() @@ -53,8 +51,7 @@ func TestUnreachableDestination(t *testing.T) { traffic := ray.NewRay() data2Send := "Data to be sent to remote" payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send)) - packet := v2net.NewPacket(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), 128), payload, false) - err := freedom.Dispatch(packet, traffic) + err := freedom.Dispatch(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), 128), payload, traffic) assert.Error(err).IsNotNil() } diff --git a/proxy/http/http.go b/proxy/http/http.go index 8c53bc3af..48e6ee298 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -143,8 +143,7 @@ func (this *HttpProxyServer) handleConnect(request *http.Request, destination v2 writer.Write(buffer.Value) buffer.Release() - packet := v2net.NewPacket(destination, nil, true) - ray := this.packetDispatcher.DispatchToOutbound(packet) + ray := this.packetDispatcher.DispatchToOutbound(destination) this.transport(reader, writer, ray) } @@ -227,8 +226,8 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D request.Write(requestBuffer) log.Debug("Request to remote:\n", serial.BytesLiteral(requestBuffer.Value)) - packet := v2net.NewPacket(dest, requestBuffer, true) - ray := this.packetDispatcher.DispatchToOutbound(packet) + ray := this.packetDispatcher.DispatchToOutbound(dest) + ray.InboundInput().Write(requestBuffer) defer ray.InboundInput().Close() var wg sync.WaitGroup diff --git a/proxy/proxy.go b/proxy/proxy.go index 1960e72de..ac703f7f5 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -2,6 +2,7 @@ package proxy // import "github.com/v2ray/v2ray-core/proxy" import ( + "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -26,5 +27,5 @@ type InboundHandler interface { // An OutboundHandler handles outbound network connection for V2Ray. type OutboundHandler interface { // Dispatch sends one or more Packets to its destination. - Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error + Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error } diff --git a/proxy/shadowsocks/shadowsocks.go b/proxy/shadowsocks/shadowsocks.go index 70b4d081b..441d713a5 100644 --- a/proxy/shadowsocks/shadowsocks.go +++ b/proxy/shadowsocks/shadowsocks.go @@ -115,9 +115,8 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D log.Access(source, dest, log.AccessAccepted, serial.StringLiteral("")) log.Info("Shadowsocks: Tunnelling request to ", dest) - packet := v2net.NewPacket(dest, request.UDPPayload, false) - this.udpServer.Dispatch(source, packet, func(packet v2net.Packet) { - defer packet.Chunk().Release() + this.udpServer.Dispatch(source, dest, request.UDPPayload, func(destination v2net.Destination, payload *alloc.Buffer) { + defer payload.Release() response := alloc.NewBuffer().Slice(0, ivLen) defer response.Release() @@ -146,7 +145,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D } writer.Write(request.Port.Bytes()) - writer.Write(packet.Chunk().Value) + writer.Write(payload.Value) if request.OTA { respAuth := NewAuthenticator(HeaderKeyGenerator(key, respIv)) @@ -198,8 +197,7 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, serial.StringLiteral("")) log.Info("Shadowsocks: Tunnelling request to ", dest) - packet := v2net.NewPacket(dest, nil, true) - ray := this.packetDispatcher.DispatchToOutbound(packet) + ray := this.packetDispatcher.DispatchToOutbound(dest) var writeFinish sync.Mutex writeFinish.Lock() diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index 24de3d2cd..b9b949cd9 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -206,8 +206,7 @@ func (this *SocksServer) handleSocks5(reader *v2io.BufferedReader, writer *v2io. dest := request.Destination() log.Info("Socks: TCP Connect request to ", dest) - packet := v2net.NewPacket(dest, nil, true) - this.transport(reader, writer, packet) + this.transport(reader, writer, dest) return nil } @@ -261,13 +260,12 @@ func (this *SocksServer) handleSocks4(reader *v2io.BufferedReader, writer *v2io. writer.SetCached(false) dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port) - packet := v2net.NewPacket(dest, nil, true) - this.transport(reader, writer, packet) + this.transport(reader, writer, dest) return nil } -func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPacket v2net.Packet) { - ray := this.packetDispatcher.DispatchToOutbound(firstPacket) +func (this *SocksServer) transport(reader io.Reader, writer io.Writer, destination v2net.Destination) { + ray := this.packetDispatcher.DispatchToOutbound(destination) input := ray.InboundInput() output := ray.InboundOutput() diff --git a/proxy/socks/udp.go b/proxy/socks/udp.go index 561c18d48..51aa6ca52 100644 --- a/proxy/socks/udp.go +++ b/proxy/socks/udp.go @@ -42,16 +42,15 @@ func (this *SocksServer) handleUDPPayload(payload *alloc.Buffer, source v2net.De return } - udpPacket := v2net.NewPacket(request.Destination(), request.Data, false) - log.Info("Socks: Send packet to ", udpPacket.Destination(), " with ", request.Data.Len(), " bytes") - this.udpServer.Dispatch(source, udpPacket, func(packet v2net.Packet) { + log.Info("Socks: Send packet to ", request.Destination(), " with ", request.Data.Len(), " bytes") + this.udpServer.Dispatch(source, request.Destination(), request.Data, func(destination v2net.Destination, payload *alloc.Buffer) { response := &protocol.Socks5UDPRequest{ Fragment: 0, - Address: udpPacket.Destination().Address(), - Port: udpPacket.Destination().Port(), - Data: packet.Chunk(), + Address: request.Destination().Address(), + Port: request.Destination().Port(), + Data: payload, } - log.Info("Socks: Writing back UDP response with ", response.Data.Len(), " bytes to ", packet.Destination()) + log.Info("Socks: Writing back UDP response with ", payload.Len(), " bytes to ", destination) udpMessage := alloc.NewSmallBuffer().Clear() response.Write(udpMessage) @@ -61,12 +60,12 @@ func (this *SocksServer) handleUDPPayload(payload *alloc.Buffer, source v2net.De this.udpMutex.RUnlock() return } - nBytes, err := this.udpHub.WriteTo(udpMessage.Value, packet.Destination()) + nBytes, err := this.udpHub.WriteTo(udpMessage.Value, destination) this.udpMutex.RUnlock() udpMessage.Release() response.Data.Release() if err != nil { - log.Error("Socks: failed to write UDP message (", nBytes, " bytes) to ", packet.Destination(), ": ", err) + log.Error("Socks: failed to write UDP message (", nBytes, " bytes) to ", destination, ": ", err) } }) } diff --git a/proxy/testing/mocks/inboundhandler.go b/proxy/testing/mocks/inboundhandler.go index b528eefde..37cfce0bf 100644 --- a/proxy/testing/mocks/inboundhandler.go +++ b/proxy/testing/mocks/inboundhandler.go @@ -29,8 +29,8 @@ func (this *InboundConnectionHandler) Close() { } -func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error { - ray := this.PacketDispatcher.DispatchToOutbound(packet) +func (this *InboundConnectionHandler) Communicate(destination v2net.Destination) error { + ray := this.PacketDispatcher.DispatchToOutbound(destination) input := ray.InboundInput() output := ray.InboundOutput() diff --git a/proxy/testing/mocks/outboundhandler.go b/proxy/testing/mocks/outboundhandler.go index 7f796a539..021684cff 100644 --- a/proxy/testing/mocks/outboundhandler.go +++ b/proxy/testing/mocks/outboundhandler.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/v2ray/v2ray-core/app" + "github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" @@ -17,32 +18,28 @@ type OutboundConnectionHandler struct { ConnOutput io.Writer } -func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.OutboundRay) error { +func (this *OutboundConnectionHandler) Dispatch(destination v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error { input := ray.OutboundInput() output := ray.OutboundOutput() - this.Destination = packet.Destination() - if packet.Chunk() != nil { - this.ConnOutput.Write(packet.Chunk().Value) - packet.Chunk().Release() - } + this.Destination = destination + this.ConnOutput.Write(payload.Value) + payload.Release() - if packet.MoreChunks() { - writeFinish := &sync.Mutex{} + writeFinish := &sync.Mutex{} - writeFinish.Lock() + writeFinish.Lock() - go func() { - v2writer := v2io.NewAdaptiveWriter(this.ConnOutput) - defer v2writer.Release() + go func() { + v2writer := v2io.NewAdaptiveWriter(this.ConnOutput) + defer v2writer.Release() - v2io.Pipe(input, v2writer) - writeFinish.Unlock() - input.Release() - }() + v2io.Pipe(input, v2writer) + writeFinish.Unlock() + input.Release() + }() - writeFinish.Lock() - } + writeFinish.Lock() v2reader := v2io.NewAdaptiveReader(this.ConnInput) defer v2reader.Release() diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index dc2290c23..9ea6942c9 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -135,7 +135,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, serial.StringLiteral("")) log.Debug("VMessIn: Received request for ", request.Destination()) - ray := this.packetDispatcher.DispatchToOutbound(v2net.NewPacket(request.Destination(), nil, true)) + ray := this.packetDispatcher.DispatchToOutbound(request.Destination()) input := ray.InboundInput() output := ray.InboundOutput() var readFinish, writeFinish sync.Mutex diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 2914eb528..1c470a9c1 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/v2ray/v2ray-core/app" + "github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -13,6 +14,7 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" + "github.com/v2ray/v2ray-core/transport/dialer" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -20,50 +22,33 @@ type VMessOutboundHandler struct { receiverManager *ReceiverManager } -func (this *VMessOutboundHandler) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { - vNextAddress, vNextUser := this.receiverManager.PickReceiver() +func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error { + destination, vNextUser := this.receiverManager.PickReceiver() command := proto.RequestCommandTCP - if firstPacket.Destination().IsUDP() { + if target.IsUDP() { command = proto.RequestCommandUDP } request := &proto.RequestHeader{ Version: raw.Version, User: vNextUser, Command: command, - Address: firstPacket.Destination().Address(), - Port: firstPacket.Destination().Port(), + Address: target.Address(), + Port: target.Port(), } if command == proto.RequestCommandUDP { request.Option |= proto.RequestOptionChunkStream } - return this.startCommunicate(request, vNextAddress, ray, firstPacket) -} - -func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader, dest v2net.Destination, ray ray.OutboundRay, firstPacket v2net.Packet) error { - var destIP net.IP - if dest.Address().IsIPv4() || dest.Address().IsIPv6() { - destIP = dest.Address().IP() - } else { - ips, err := net.LookupIP(dest.Address().Domain()) - if err != nil { - return err - } - destIP = ips[0] - } - conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ - IP: destIP, - Port: int(dest.Port()), - }) + conn, err := dialer.Dial(destination) if err != nil { - log.Error("Failed to open ", dest, ": ", err) + log.Error("Failed to open ", destination, ": ", err) if ray != nil { ray.OutboundOutput().Close() } return err } - log.Info("VMessOut: Tunneling request to ", request.Address, " via ", dest) + log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination) defer conn.Close() @@ -76,46 +61,43 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader, session := raw.NewClientSession(proto.DefaultIDHash) - go this.handleRequest(session, conn, request, firstPacket, input, &requestFinish) - go this.handleResponse(session, conn, request, dest, output, &responseFinish) + go this.handleRequest(session, conn, request, payload, input, &requestFinish) + go this.handleResponse(session, conn, request, destination, output, &responseFinish) requestFinish.Lock() - conn.CloseWrite() + if tcpConn, ok := conn.(*net.TCPConn); ok { + tcpConn.CloseWrite() + } + responseFinish.Lock() output.Close() input.Release() return nil } -func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input v2io.Reader, finish *sync.Mutex) { +func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) { defer finish.Unlock() + defer payload.Release() writer := v2io.NewBufferedWriter(conn) defer writer.Release() session.EncodeRequestHeader(request, writer) - // Send first packet of payload together with request, in favor of small requests. - firstChunk := firstPacket.Chunk() - moreChunks := firstPacket.MoreChunks() - if request.Option.IsChunkStream() { - vmessio.Authenticate(firstChunk) + vmessio.Authenticate(payload) } bodyWriter := session.EncodeRequestBody(writer) - bodyWriter.Write(firstChunk.Value) - firstChunk.Release() + bodyWriter.Write(payload.Value) writer.SetCached(false) - if moreChunks { - var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) - if request.Option.IsChunkStream() { - streamWriter = vmessio.NewAuthChunkWriter(streamWriter) - } - v2io.Pipe(input, streamWriter) - streamWriter.Release() + var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) + if request.Option.IsChunkStream() { + streamWriter = vmessio.NewAuthChunkWriter(streamWriter) } + v2io.Pipe(input, streamWriter) + streamWriter.Release() return } diff --git a/proxy/vmess/vmess_test.go b/proxy/vmess/vmess_test.go index bd7aeba6d..c2f22e707 100644 --- a/proxy/vmess/vmess_test.go +++ b/proxy/vmess/vmess_test.go @@ -107,7 +107,7 @@ func TestVMessInAndOut(t *testing.T) { assert.Error(err).IsNil() dest := v2net.TCPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}), 80) - ich.Communicate(v2net.NewPacket(dest, nil, true)) + ich.Communicate(dest) assert.Bytes(ichConnInput).Equals(ochConnOutput.Bytes()) assert.Bytes(ichConnOutput.Bytes()).Equals(ochConnInput) } diff --git a/shell/point/point.go b/shell/point/point.go index fcff87d45..cc810469c 100644 --- a/shell/point/point.go +++ b/shell/point/point.go @@ -173,16 +173,14 @@ func (this *Point) Start() error { // Dispatches a Packet to an OutboundConnection. // The packet will be passed through the router (if configured), and then sent to an outbound // connection with matching tag. -func (this *Point) DispatchToOutbound(context app.Context, packet v2net.Packet) ray.InboundRay { +func (this *Point) DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay { direct := ray.NewRay() - dest := packet.Destination() - dispatcher := this.och if this.router != nil { - if tag, err := this.router.TakeDetour(dest); err == nil { + if tag, err := this.router.TakeDetour(destination); err == nil { if handler, found := this.odh[tag]; found { - log.Info("Point: Taking detour [", tag, "] for [", dest, "]", tag, dest) + log.Info("Point: Taking detour [", tag, "] for [", destination, "]") dispatcher = handler } else { log.Warning("Point: Unable to find routing destination: ", tag) @@ -190,34 +188,19 @@ func (this *Point) DispatchToOutbound(context app.Context, packet v2net.Packet) } } - go this.FilterPacketAndDispatch(packet, direct, dispatcher) + go this.FilterPacketAndDispatch(destination, direct, dispatcher) return direct } -func (this *Point) FilterPacketAndDispatch(packet v2net.Packet, link ray.OutboundRay, dispatcher proxy.OutboundHandler) { - // Filter empty packets - chunk := packet.Chunk() - moreChunks := packet.MoreChunks() - changed := false - var err error - for chunk == nil && moreChunks { - changed = true - chunk, err = link.OutboundInput().Read() - if err != nil { - moreChunks = false - } - } - if chunk == nil && !moreChunks { +func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) { + payload, err := link.OutboundInput().Read() + if err != nil { log.Info("Point: No payload to dispatch, stopping dispatching now.") link.OutboundOutput().Close() + link.OutboundInput().Release() return } - - if changed { - packet = v2net.NewPacket(packet.Destination(), chunk, moreChunks) - } - - dispatcher.Dispatch(packet, link) + dispatcher.Dispatch(destination, payload, link) } func (this *Point) GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) { diff --git a/transport/hub/udp_server.go b/transport/hub/udp_server.go index 7e61262ed..1fd930929 100644 --- a/transport/hub/udp_server.go +++ b/transport/hub/udp_server.go @@ -4,11 +4,12 @@ import ( "sync" "github.com/v2ray/v2ray-core/app/dispatcher" + "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport/ray" ) -type UDPResponseCallback func(packet v2net.Packet) +type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer) type connEntry struct { inboundRay ray.InboundRay @@ -28,24 +29,26 @@ func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer { } } -func (this *UDPServer) locateExistingAndDispatch(dest string, packet v2net.Packet) bool { +func (this *UDPServer) locateExistingAndDispatch(dest string, payload *alloc.Buffer) bool { this.RLock() defer this.RUnlock() if entry, found := this.conns[dest]; found { - entry.inboundRay.InboundInput().Write(packet.Chunk()) + entry.inboundRay.InboundInput().Write(payload) return true } return false } -func (this *UDPServer) Dispatch(source v2net.Destination, packet v2net.Packet, callback UDPResponseCallback) { - destString := source.String() + "-" + packet.Destination().NetAddr() - if this.locateExistingAndDispatch(destString, packet) { +func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) { + destString := source.String() + "-" + destination.NetAddr() + if this.locateExistingAndDispatch(destString, payload) { return } this.Lock() - inboundRay := this.packetDispatcher.DispatchToOutbound(v2net.NewPacket(packet.Destination(), packet.Chunk(), true)) + inboundRay := this.packetDispatcher.DispatchToOutbound(destination) + inboundRay.InboundInput().Write(payload) + this.conns[destString] = &connEntry{ inboundRay: inboundRay, callback: callback, @@ -60,7 +63,7 @@ func (this *UDPServer) handleConnection(destString string, inboundRay ray.Inboun if err != nil { break } - callback(v2net.NewPacket(source, data, false)) + callback(source, data) } this.Lock() delete(this.conns, destString)