From 4874cd54a4071a8e249e86a49bf33c9f95a4a1ad Mon Sep 17 00:00:00 2001 From: V2Ray Date: Tue, 22 Sep 2015 14:45:03 +0200 Subject: [PATCH] Introduce Packet to unify TCP and UDP communication --- common/net/packet.go | 53 ++++++++++++++++++++++++++++++++ point.go | 6 ++-- proxy/freedom/freedom.go | 31 +++++++++++++------ proxy/freedom/freedomfactory.go | 4 +-- proxy/socks/socks.go | 2 +- proxy/vmess/vmess_test.go | 2 +- proxy/vmess/vmessin.go | 2 +- proxy/vmess/vmessout.go | 39 +++++++++++++++-------- ray.go | 16 ++++++---- release/config/out_vmess.json | 2 +- testing/mocks/inboundhandler.go | 4 +-- testing/mocks/outboundhandler.go | 4 +-- 12 files changed, 124 insertions(+), 41 deletions(-) create mode 100644 common/net/packet.go diff --git a/common/net/packet.go b/common/net/packet.go new file mode 100644 index 000000000..422166691 --- /dev/null +++ b/common/net/packet.go @@ -0,0 +1,53 @@ +package net + +type Packet interface { + Destination() Destination + Chunk() []byte // First chunk of this commnunication + MoreChunks() bool +} + +func NewTCPPacket(dest Destination) *TCPPacket { + return &TCPPacket{ + basePacket: basePacket{destination: dest}, + } +} + +func NewUDPPacket(dest Destination, data []byte) *UDPPacket { + return &UDPPacket{ + basePacket: basePacket{destination: dest}, + data: data, + } +} + +type basePacket struct { + destination Destination +} + +func (base basePacket) Destination() Destination { + return base.destination +} + +type TCPPacket struct { + basePacket +} + +func (packet *TCPPacket) Chunk() []byte { + return nil +} + +func (packet *TCPPacket) MoreChunks() bool { + return true +} + +type UDPPacket struct { + basePacket + data []byte +} + +func (packet *UDPPacket) Chunk() []byte { + return packet.data +} + +func (packet *UDPPacket) MoreChunks() bool { + return false +} diff --git a/point.go b/point.go index 7713a1833..a03ffeb12 100644 --- a/point.go +++ b/point.go @@ -64,7 +64,7 @@ type InboundConnectionHandler interface { } type OutboundConnectionHandlerFactory interface { - Create(VP *Point, config []byte, dest v2net.Destination) (OutboundConnectionHandler, error) + Create(VP *Point, config []byte, firstPacket v2net.Packet) (OutboundConnectionHandler, error) } type OutboundConnectionHandler interface { @@ -85,10 +85,10 @@ func (vp *Point) Start() error { return nil } -func (vp *Point) NewInboundConnectionAccepted(destination v2net.Destination) InboundRay { +func (p *Point) DispatchToOutbound(packet v2net.Packet) InboundRay { ray := NewRay() // TODO: handle error - och, _ := vp.ochFactory.Create(vp, vp.ochConfig, destination) + och, _ := p.ochFactory.Create(p, p.ochConfig, packet) _ = och.Start(ray) return ray } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index a4a638288..a2999b54e 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -9,25 +9,38 @@ import ( ) type FreedomConnection struct { - dest v2net.Destination + packet v2net.Packet } -func NewFreedomConnection(dest v2net.Destination) *FreedomConnection { +func NewFreedomConnection(firstPacket v2net.Packet) *FreedomConnection { return &FreedomConnection{ - dest: dest, + packet: firstPacket, } } func (vconn *FreedomConnection) Start(ray core.OutboundRay) error { - input := ray.OutboundInput() - output := ray.OutboundOutput() - conn, err := net.Dial(vconn.dest.Network(), vconn.dest.Address().String()) - log.Info("Freedom: Opening connection to %s", vconn.dest.String()) + conn, err := net.Dial(vconn.packet.Destination().Network(), vconn.packet.Destination().Address().String()) + log.Info("Freedom: Opening connection to %s", vconn.packet.Destination().String()) if err != nil { - close(output) - return log.Error("Freedom: Failed to open connection: %s : %v", vconn.dest.String(), err) + if ray != nil { + close(ray.OutboundOutput()) + } + return log.Error("Freedom: Failed to open connection: %s : %v", vconn.packet.Destination().String(), err) } + if chunk := vconn.packet.Chunk(); chunk != nil { + conn.Write(chunk) + } + + if !vconn.packet.MoreChunks() { + if ray != nil { + close(ray.OutboundOutput()) + } + return nil + } + + input := ray.OutboundInput() + output := ray.OutboundOutput() readFinish := make(chan bool) writeFinish := make(chan bool) diff --git a/proxy/freedom/freedomfactory.go b/proxy/freedom/freedomfactory.go index fcdd449fe..23b03469c 100644 --- a/proxy/freedom/freedomfactory.go +++ b/proxy/freedom/freedomfactory.go @@ -8,8 +8,8 @@ import ( type FreedomFactory struct { } -func (factory FreedomFactory) Create(vp *core.Point, config []byte, dest v2net.Destination) (core.OutboundConnectionHandler, error) { - return NewFreedomConnection(dest), nil +func (factory FreedomFactory) Create(vp *core.Point, config []byte, firstPacket v2net.Packet) (core.OutboundConnectionHandler, error) { + return NewFreedomConnection(firstPacket), nil } func init() { diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index eaeba062f..c54c45aa5 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -170,7 +170,7 @@ func (server *SocksServer) HandleConnection(connection net.Conn) error { dest = request.Destination() } - ray := server.vPoint.NewInboundConnectionAccepted(dest) + ray := server.vPoint.DispatchToOutbound(v2net.NewTCPPacket(dest)) input := ray.InboundInput() output := ray.InboundOutput() readFinish := make(chan bool) diff --git a/proxy/vmess/vmess_test.go b/proxy/vmess/vmess_test.go index 8a54ca84d..7ed772870 100644 --- a/proxy/vmess/vmess_test.go +++ b/proxy/vmess/vmess_test.go @@ -69,7 +69,7 @@ func TestVMessInAndOut(t *testing.T) { assert.Error(err).IsNil() dest := v2net.NewTCPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}, 80)) - ich.Communicate(dest) + ich.Communicate(v2net.NewTCPPacket(dest)) assert.Bytes([]byte(data2Send)).Equals(och.Data2Send.Bytes()) assert.Bytes(ich.DataReturned.Bytes()).Equals(och.Data2Return) } diff --git a/proxy/vmess/vmessin.go b/proxy/vmess/vmessin.go index 655a167b8..3e520c9f0 100644 --- a/proxy/vmess/vmessin.go +++ b/proxy/vmess/vmessin.go @@ -74,7 +74,7 @@ func (handler *VMessInboundHandler) HandleConnection(connection net.Conn) error // Clear read timeout connection.SetReadDeadline(zeroTime) - ray := handler.vPoint.NewInboundConnectionAccepted(request.Destination()) + ray := handler.vPoint.DispatchToOutbound(v2net.NewTCPPacket(request.Destination())) input := ray.InboundInput() output := ray.InboundOutput() diff --git a/proxy/vmess/vmessout.go b/proxy/vmess/vmessout.go index f06974029..62a647e1d 100644 --- a/proxy/vmess/vmessout.go +++ b/proxy/vmess/vmessout.go @@ -27,14 +27,14 @@ type VNextServer struct { type VMessOutboundHandler struct { vPoint *core.Point - dest v2net.Destination + packet v2net.Packet vNextList []VNextServer } -func NewVMessOutboundHandler(vp *core.Point, vNextList []VNextServer, dest v2net.Destination) *VMessOutboundHandler { +func NewVMessOutboundHandler(vp *core.Point, vNextList []VNextServer, firstPacket v2net.Packet) *VMessOutboundHandler { return &VMessOutboundHandler{ vPoint: vp, - dest: dest, + packet: firstPacket, vNextList: vNextList, } } @@ -66,37 +66,50 @@ func (handler *VMessOutboundHandler) Start(ray core.OutboundRay) error { vNextAddress, vNextUser := handler.pickVNext() command := protocol.CmdTCP - if handler.dest.IsUDP() { + if handler.packet.Destination().IsUDP() { command = protocol.CmdUDP } request := &protocol.VMessRequest{ Version: protocol.Version, UserId: vNextUser.Id, Command: command, - Address: handler.dest.Address(), + Address: handler.packet.Destination().Address(), } rand.Read(request.RequestIV[:]) rand.Read(request.RequestKey[:]) rand.Read(request.ResponseHeader[:]) - go startCommunicate(request, vNextAddress, ray) + go startCommunicate(request, vNextAddress, ray, handler.packet) return nil } -func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ray core.OutboundRay) error { - input := ray.OutboundInput() - output := ray.OutboundOutput() - +func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ray core.OutboundRay, firstPacket v2net.Packet) error { conn, err := net.DialTCP(dest.Network(), nil, &net.TCPAddr{dest.Address().IP(), int(dest.Address().Port()), ""}) if err != nil { log.Error("Failed to open tcp (%s): %v", dest.String(), err) - close(output) + if ray != nil { + close(ray.OutboundOutput()) + } return err } log.Info("VMessOut: Tunneling request for %s", request.Address.String()) defer conn.Close() + if chunk := firstPacket.Chunk(); chunk != nil { + conn.Write(chunk) + } + + if !firstPacket.MoreChunks() { + if ray != nil { + close(ray.OutboundOutput()) + } + return nil + } + + input := ray.OutboundInput() + output := ray.OutboundOutput() + requestFinish := make(chan bool) responseFinish := make(chan bool) @@ -171,7 +184,7 @@ func handleResponse(conn *net.TCPConn, request *protocol.VMessRequest, output ch type VMessOutboundHandlerFactory struct { } -func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig []byte, destination v2net.Destination) (core.OutboundConnectionHandler, error) { +func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig []byte, firstPacket v2net.Packet) (core.OutboundConnectionHandler, error) { config, err := loadOutboundConfig(rawConfig) if err != nil { panic(log.Error("Failed to load VMess outbound config: %v", err)) @@ -180,7 +193,7 @@ func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig []b for _, server := range config.VNextList { servers = append(servers, server.ToVNextServer()) } - return NewVMessOutboundHandler(vp, servers, destination), nil + return NewVMessOutboundHandler(vp, servers, firstPacket), nil } func init() { diff --git a/ray.go b/ray.go index b0f7d41dd..6c5d6cc4a 100644 --- a/ray.go +++ b/ray.go @@ -4,13 +4,14 @@ const ( bufferSize = 16 ) +// Ray is an internal tranport channel bewteen inbound and outbound connection. type Ray struct { Input chan []byte Output chan []byte } -func NewRay() Ray { - return Ray{ +func NewRay() *Ray { + return &Ray{ Input: make(chan []byte, bufferSize), Output: make(chan []byte, bufferSize), } @@ -26,18 +27,21 @@ type InboundRay interface { InboundOutput() <-chan []byte } -func (ray Ray) OutboundInput() <-chan []byte { +func (ray *Ray) OutboundInput() <-chan []byte { return ray.Input } -func (ray Ray) OutboundOutput() chan<- []byte { +func (ray *Ray) OutboundOutput() chan<- []byte { return ray.Output } -func (ray Ray) InboundInput() chan<- []byte { +func (ray *Ray) InboundInput() chan<- []byte { return ray.Input } -func (ray Ray) InboundOutput() <-chan []byte { +func (ray *Ray) InboundOutput() <-chan []byte { return ray.Output } + +type UDPRay struct { +} diff --git a/release/config/out_vmess.json b/release/config/out_vmess.json index 3b6ec06b9..3bfc473fc 100644 --- a/release/config/out_vmess.json +++ b/release/config/out_vmess.json @@ -1,7 +1,7 @@ { "vnext": [ { - "address": "127.0.0.1", + "address": "130.211.53.3", "port": 27183, "users": [ {"id": "ad937d9d-6e23-4a5a-ba23-bce5092a7c51"} diff --git a/testing/mocks/inboundhandler.go b/testing/mocks/inboundhandler.go index 202dbc8cb..a3e93d8a0 100644 --- a/testing/mocks/inboundhandler.go +++ b/testing/mocks/inboundhandler.go @@ -19,8 +19,8 @@ func (handler *InboundConnectionHandler) Listen(port uint16) error { return nil } -func (handler *InboundConnectionHandler) Communicate(dest v2net.Destination) error { - ray := handler.Server.NewInboundConnectionAccepted(dest) +func (handler *InboundConnectionHandler) Communicate(packet v2net.Packet) error { + ray := handler.Server.DispatchToOutbound(packet) input := ray.InboundInput() output := ray.InboundOutput() diff --git a/testing/mocks/outboundhandler.go b/testing/mocks/outboundhandler.go index 2117f95cd..9b43d412f 100644 --- a/testing/mocks/outboundhandler.go +++ b/testing/mocks/outboundhandler.go @@ -32,7 +32,7 @@ func (handler *OutboundConnectionHandler) Start(ray core.OutboundRay) error { return nil } -func (handler *OutboundConnectionHandler) Create(point *core.Point, config []byte, dest v2net.Destination) (core.OutboundConnectionHandler, error) { - handler.Destination = dest +func (handler *OutboundConnectionHandler) Create(point *core.Point, config []byte, packet v2net.Packet) (core.OutboundConnectionHandler, error) { + handler.Destination = packet.Destination() return handler, nil }