From cd81e5531b60b66ef51ce1c36234b8c06e441cf5 Mon Sep 17 00:00:00 2001 From: V2Ray Date: Wed, 7 Oct 2015 00:30:44 +0200 Subject: [PATCH] reuse outbound connection handler --- common/collect/timed_queue.go | 10 ++++---- point.go | 43 +++++++++++++++++--------------- proxy/freedom/freedom.go | 21 +++++++--------- proxy/freedom/freedomfactory.go | 5 ++-- proxy/vmess/vmessout.go | 18 ++++++------- release/server/main.go | 2 +- testing/mocks/outboundhandler.go | 14 +++++------ 7 files changed, 55 insertions(+), 58 deletions(-) diff --git a/common/collect/timed_queue.go b/common/collect/timed_queue.go index 41ddc87dd..2943e7c57 100644 --- a/common/collect/timed_queue.go +++ b/common/collect/timed_queue.go @@ -72,16 +72,16 @@ func (queue *TimedQueue) RemovedEntries() <-chan interface{} { func (queue *TimedQueue) cleanup(tick <-chan time.Time) { for { now := <-tick - queue.access.RLock() - queueLen := queue.queue.Len() - queue.access.RUnlock() + queue.access.RLock() + queueLen := queue.queue.Len() + queue.access.RUnlock() if queueLen == 0 { continue } nowSec := now.UTC().Unix() - queue.access.RLock() + queue.access.RLock() firstEntryTime := queue.queue[0].timeSec - queue.access.RUnlock() + queue.access.RUnlock() if firstEntryTime > nowSec { continue } diff --git a/point.go b/point.go index b921e75af..d52ae14e4 100644 --- a/point.go +++ b/point.go @@ -25,11 +25,9 @@ func RegisterOutboundConnectionHandlerFactory(name string, factory OutboundConne // Point is an single server in V2Ray system. type Point struct { - port uint16 - ichFactory InboundConnectionHandlerFactory - ichConfig interface{} - ochFactory OutboundConnectionHandlerFactory - ochConfig interface{} + port uint16 + ich InboundConnectionHandler + och OutboundConnectionHandler } // NewPoint returns a new Point server based on given configuration. @@ -42,16 +40,25 @@ func NewPoint(pConfig config.PointConfig) (*Point, error) { if !ok { panic(log.Error("Unknown inbound connection handler factory %s", pConfig.InboundConfig().Protocol())) } - vpoint.ichFactory = ichFactory - vpoint.ichConfig = pConfig.InboundConfig().Settings(config.TypeInbound) + ichConfig := pConfig.InboundConfig().Settings(config.TypeInbound) + ich, err := ichFactory.Create(vpoint, ichConfig) + if err != nil { + log.Error("Failed to create inbound connection handler: %v", err) + return nil, err + } + vpoint.ich = ich ochFactory, ok := outboundFactories[pConfig.OutboundConfig().Protocol()] if !ok { panic(log.Error("Unknown outbound connection handler factory %s", pConfig.OutboundConfig().Protocol)) } - - vpoint.ochFactory = ochFactory - vpoint.ochConfig = pConfig.OutboundConfig().Settings(config.TypeOutbound) + ochConfig := pConfig.OutboundConfig().Settings(config.TypeOutbound) + och, err := ochFactory.Create(vpoint, ochConfig) + if err != nil { + log.Error("Failed to create outbound connection handler: %v", err) + return nil, err + } + vpoint.och = och return vpoint, nil } @@ -65,11 +72,11 @@ type InboundConnectionHandler interface { } type OutboundConnectionHandlerFactory interface { - Create(VP *Point, config interface{}, firstPacket v2net.Packet) (OutboundConnectionHandler, error) + Create(VP *Point, config interface{}) (OutboundConnectionHandler, error) } type OutboundConnectionHandler interface { - Start(ray OutboundRay) error + Dispatch(firstPacket v2net.Packet, ray OutboundRay) error } // Start starts the Point server, and return any error during the process. @@ -79,18 +86,14 @@ func (vp *Point) Start() error { return log.Error("Invalid port %d", vp.port) } - inboundConnectionHandler, err := vp.ichFactory.Create(vp, vp.ichConfig) - if err != nil { - return err - } - err = inboundConnectionHandler.Listen(vp.port) - return nil + err := vp.ich.Listen(vp.port) + // TODO: handle error + return err } func (p *Point) DispatchToOutbound(packet v2net.Packet) InboundRay { ray := NewRay() // TODO: handle error - och, _ := p.ochFactory.Create(p, p.ochConfig, packet) - _ = och.Start(ray) + p.och.Dispatch(packet, ray) return ray } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 3d7cdacb2..2adddfd72 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -10,23 +10,20 @@ import ( ) type FreedomConnection struct { - packet v2net.Packet } -func NewFreedomConnection(firstPacket v2net.Packet) *FreedomConnection { - return &FreedomConnection{ - packet: firstPacket, - } +func NewFreedomConnection() *FreedomConnection { + return &FreedomConnection{} } -func (vconn *FreedomConnection) Start(ray core.OutboundRay) error { - conn, err := net.Dial(vconn.packet.Destination().Network(), vconn.packet.Destination().Address().String()) - log.Info("Freedom: Opening connection to %s", vconn.packet.Destination().String()) +func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.OutboundRay) error { + conn, err := net.Dial(firstPacket.Destination().Network(), firstPacket.Destination().Address().String()) + log.Info("Freedom: Opening connection to %s", firstPacket.Destination().String()) if err != nil { if ray != nil { close(ray.OutboundOutput()) } - return log.Error("Freedom: Failed to open connection: %s : %v", vconn.packet.Destination().String(), err) + return log.Error("Freedom: Failed to open connection: %s : %v", firstPacket.Destination().String(), err) } input := ray.OutboundInput() @@ -35,17 +32,17 @@ func (vconn *FreedomConnection) Start(ray core.OutboundRay) error { readMutex.Lock() writeMutex.Lock() - if chunk := vconn.packet.Chunk(); chunk != nil { + if chunk := firstPacket.Chunk(); chunk != nil { conn.Write(chunk) } - if !vconn.packet.MoreChunks() { + if !firstPacket.MoreChunks() { writeMutex.Unlock() } else { go dumpInput(conn, input, &writeMutex) } - go dumpOutput(conn, output, &readMutex, vconn.packet.Destination().IsUDP()) + go dumpOutput(conn, output, &readMutex, firstPacket.Destination().IsUDP()) go func() { writeMutex.Lock() diff --git a/proxy/freedom/freedomfactory.go b/proxy/freedom/freedomfactory.go index d65784fbc..ebd3964ac 100644 --- a/proxy/freedom/freedomfactory.go +++ b/proxy/freedom/freedomfactory.go @@ -2,14 +2,13 @@ package freedom import ( "github.com/v2ray/v2ray-core" - v2net "github.com/v2ray/v2ray-core/common/net" ) type FreedomFactory struct { } -func (factory FreedomFactory) Create(vp *core.Point, config interface{}, firstPacket v2net.Packet) (core.OutboundConnectionHandler, error) { - return NewFreedomConnection(firstPacket), nil +func (factory FreedomFactory) Create(vp *core.Point, config interface{}) (core.OutboundConnectionHandler, error) { + return NewFreedomConnection(), nil } func init() { diff --git a/proxy/vmess/vmessout.go b/proxy/vmess/vmessout.go index 3ca39b72c..7a6d6fbdb 100644 --- a/proxy/vmess/vmessout.go +++ b/proxy/vmess/vmessout.go @@ -28,15 +28,13 @@ type VNextServer struct { type VMessOutboundHandler struct { vPoint *core.Point - packet v2net.Packet vNextList []VNextServer vNextListUDP []VNextServer } -func NewVMessOutboundHandler(vp *core.Point, vNextList, vNextListUDP []VNextServer, firstPacket v2net.Packet) *VMessOutboundHandler { +func NewVMessOutboundHandler(vp *core.Point, vNextList, vNextListUDP []VNextServer) *VMessOutboundHandler { return &VMessOutboundHandler{ vPoint: vp, - packet: firstPacket, vNextList: vNextList, vNextListUDP: vNextListUDP, } @@ -65,28 +63,28 @@ func pickVNext(serverList []VNextServer) (v2net.Destination, user.User) { return vNext.Destination, vNextUser } -func (handler *VMessOutboundHandler) Start(ray core.OutboundRay) error { +func (handler *VMessOutboundHandler) Dispatch(firstPacket v2net.Packet, ray core.OutboundRay) error { vNextList := handler.vNextList - if handler.packet.Destination().IsUDP() { + if firstPacket.Destination().IsUDP() { vNextList = handler.vNextListUDP } vNextAddress, vNextUser := pickVNext(vNextList) command := protocol.CmdTCP - if handler.packet.Destination().IsUDP() { + if firstPacket.Destination().IsUDP() { command = protocol.CmdUDP } request := &protocol.VMessRequest{ Version: protocol.Version, UserId: vNextUser.Id, Command: command, - Address: handler.packet.Destination().Address(), + Address: firstPacket.Destination().Address(), } rand.Read(request.RequestIV[:]) rand.Read(request.RequestKey[:]) rand.Read(request.ResponseHeader[:]) - go startCommunicate(request, vNextAddress, ray, handler.packet) + go startCommunicate(request, vNextAddress, ray, firstPacket) return nil } @@ -195,7 +193,7 @@ func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<- type VMessOutboundHandlerFactory struct { } -func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig interface{}, firstPacket v2net.Packet) (core.OutboundConnectionHandler, error) { +func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig interface{}) (core.OutboundConnectionHandler, error) { config := rawConfig.(*VMessOutboundConfig) servers := make([]VNextServer, 0, len(config.VNextList)) udpServers := make([]VNextServer, 0, len(config.VNextList)) @@ -207,7 +205,7 @@ func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig int udpServers = append(udpServers, server.ToVNextServer("udp")) } } - return NewVMessOutboundHandler(vp, servers, udpServers, firstPacket), nil + return NewVMessOutboundHandler(vp, servers, udpServers), nil } func init() { diff --git a/release/server/main.go b/release/server/main.go index 957a94e0a..9b44f85c8 100644 --- a/release/server/main.go +++ b/release/server/main.go @@ -9,8 +9,8 @@ import ( jsonconf "github.com/v2ray/v2ray-core/config/json" // The following are neccesary as they register handlers in their init functions. + _ "github.com/v2ray/v2ray-core/proxy/freedom" _ "github.com/v2ray/v2ray-core/proxy/freedom/config/json" - _ "github.com/v2ray/v2ray-core/proxy/freedom" _ "github.com/v2ray/v2ray-core/proxy/socks" _ "github.com/v2ray/v2ray-core/proxy/vmess" ) diff --git a/testing/mocks/outboundhandler.go b/testing/mocks/outboundhandler.go index bed93678c..0a3ef47d8 100644 --- a/testing/mocks/outboundhandler.go +++ b/testing/mocks/outboundhandler.go @@ -13,10 +13,15 @@ type OutboundConnectionHandler struct { Destination v2net.Destination } -func (handler *OutboundConnectionHandler) Start(ray core.OutboundRay) error { +func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core.OutboundRay) error { input := ray.OutboundInput() output := ray.OutboundOutput() + handler.Destination = packet.Destination() + if packet.Chunk() != nil { + handler.Data2Send.Write(packet.Chunk()) + } + go func() { for { data, open := <-input @@ -34,11 +39,6 @@ func (handler *OutboundConnectionHandler) Start(ray core.OutboundRay) error { return nil } -func (handler *OutboundConnectionHandler) Create(point *core.Point, config interface{}, packet v2net.Packet) (core.OutboundConnectionHandler, error) { - handler.Destination = packet.Destination() - if packet.Chunk() != nil { - handler.Data2Send.Write(packet.Chunk()) - } - +func (handler *OutboundConnectionHandler) Create(point *core.Point, config interface{}) (core.OutboundConnectionHandler, error) { return handler, nil }