From 201481a82c425e8069a5b8d4c467882d7e69be16 Mon Sep 17 00:00:00 2001 From: v2ray Date: Sun, 3 Jan 2016 23:30:37 +0100 Subject: [PATCH] close method for inbound connection handler --- proxy/dokodemo/dokodemo.go | 60 ++++++++++++++++++++------- proxy/http/http.go | 45 ++++++++++++++------ proxy/proxy.go | 1 + proxy/socks/socks.go | 45 ++++++++++++++++---- proxy/socks/udp.go | 36 +++++++++------- proxy/testing/mocks/inboundhandler.go | 4 ++ proxy/vmess/inbound/inbound.go | 36 ++++++++++++---- 7 files changed, 171 insertions(+), 56 deletions(-) diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index bfbb3441c..4ed3bd732 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -13,11 +13,14 @@ import ( ) type DokodemoDoor struct { - config Config - accepting bool - address v2net.Address - port v2net.Port - space app.Space + sync.Mutex + config Config + accepting bool + address v2net.Address + port v2net.Port + space app.Space + tcpListener *net.TCPListener + udpConn *net.UDPConn } func NewDokodemoDoor(space app.Space, config Config) *DokodemoDoor { @@ -29,6 +32,22 @@ func NewDokodemoDoor(space app.Space, config Config) *DokodemoDoor { } } +func (this *DokodemoDoor) Close() { + this.accepting = false + if this.tcpListener != nil { + this.Lock() + this.tcpListener.Close() + this.tcpListener = nil + this.Unlock() + } + if this.udpConn != nil { + this.Lock() + this.udpConn.Close() + this.udpConn = nil + this.Unlock() + } +} + func (this *DokodemoDoor) Listen(port v2net.Port) error { this.accepting = true @@ -57,14 +76,20 @@ func (this *DokodemoDoor) ListenUDP(port v2net.Port) error { log.Error("Dokodemo failed to listen on port %d: %v", port, err) return err } - go this.handleUDPPackets(udpConn) + this.udpConn = udpConn + go this.handleUDPPackets() return nil } -func (this *DokodemoDoor) handleUDPPackets(udpConn *net.UDPConn) { - defer udpConn.Close() +func (this *DokodemoDoor) handleUDPPackets() { for this.accepting { buffer := alloc.NewBuffer() + var udpConn *net.UDPConn + this.Lock() + if this.udpConn != nil { + udpConn = this.udpConn + } + this.Unlock() nBytes, addr, err := udpConn.ReadFromUDP(buffer.Value) buffer.Slice(0, nBytes) if err != nil { @@ -93,19 +118,24 @@ func (this *DokodemoDoor) ListenTCP(port v2net.Port) error { log.Error("Dokodemo failed to listen on port %d: %v", port, err) return err } - go this.AcceptTCPConnections(tcpListener) + this.tcpListener = tcpListener + go this.AcceptTCPConnections() return nil } -func (this *DokodemoDoor) AcceptTCPConnections(tcpListener *net.TCPListener) { +func (this *DokodemoDoor) AcceptTCPConnections() { for this.accepting { retry.Timed(100, 100).On(func() error { - connection, err := tcpListener.AcceptTCP() - if err != nil { - log.Error("Dokodemo failed to accept new connections: %v", err) - return err + this.Lock() + defer this.Unlock() + if this.tcpListener != nil { + connection, err := this.tcpListener.AcceptTCP() + if err != nil { + log.Error("Dokodemo failed to accept new connections: %v", err) + return err + } + go this.HandleTCPConnection(connection) } - go this.HandleTCPConnection(connection) return nil }) } diff --git a/proxy/http/http.go b/proxy/http/http.go index 8689a7c69..69c9b03c7 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -13,13 +13,16 @@ import ( "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/transport/ray" ) type HttpProxyServer struct { - accepting bool - space app.Space - config Config + sync.Mutex + accepting bool + space app.Space + config Config + tcpListener *net.TCPListener } func NewHttpProxyServer(space app.Space, config Config) *HttpProxyServer { @@ -29,6 +32,16 @@ func NewHttpProxyServer(space app.Space, config Config) *HttpProxyServer { } } +func (this *HttpProxyServer) Close() { + this.accepting = false + if this.tcpListener != nil { + this.Lock() + this.tcpListener.Close() + this.tcpListener = nil + this.Unlock() + } +} + func (this *HttpProxyServer) Listen(port v2net.Port) error { tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{ Port: int(port.Value()), @@ -37,19 +50,27 @@ func (this *HttpProxyServer) Listen(port v2net.Port) error { if err != nil { return err } - go this.accept(tcpListener) + this.tcpListener = tcpListener + this.accepting = true + go this.accept() return nil } -func (this *HttpProxyServer) accept(listener *net.TCPListener) { - this.accepting = true +func (this *HttpProxyServer) accept() { for this.accepting { - tcpConn, err := listener.AcceptTCP() - if err != nil { - log.Error("Failed to accept HTTP connection: %v", err) - continue - } - go this.handleConnection(tcpConn) + retry.Timed(100 /* times */, 100 /* ms */).On(func() error { + this.Lock() + defer this.Unlock() + if this.tcpListener != nil { + tcpConn, err := this.tcpListener.AcceptTCP() + if err != nil { + log.Error("Failed to accept HTTP connection: %v", err) + return err + } + go this.handleConnection(tcpConn) + } + return nil + }) } } diff --git a/proxy/proxy.go b/proxy/proxy.go index e3f9beb4a..aae7132d5 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -12,6 +12,7 @@ type InboundConnectionHandler interface { // Listen starts a InboundConnectionHandler by listen on a specific port. This method is called // exactly once during runtime. Listen(port v2net.Port) error + Close() } // An OutboundConnectionHandler handles outbound network connection for V2Ray. diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index 48664b18d..44a34fdd3 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -23,9 +23,13 @@ var ( // SocksServer is a SOCKS 5 proxy server type SocksServer struct { - accepting bool - space app.Space - config Config + sync.RWMutex + accepting bool + space app.Space + config Config + tcpListener *net.TCPListener + udpConn *net.UDPConn + udpAddress v2net.Destination } func NewSocksServer(space app.Space, config Config) *SocksServer { @@ -35,6 +39,26 @@ func NewSocksServer(space app.Space, config Config) *SocksServer { } } +func (this *SocksServer) Close() { + this.accepting = false + if this.tcpListener != nil { + this.Lock() + if this.tcpListener != nil { + this.tcpListener.Close() + this.tcpListener = nil + } + this.Unlock() + } + if this.udpConn != nil { + this.Lock() + if this.udpConn != nil { + this.udpConn.Close() + this.udpConn = nil + } + this.Unlock() + } +} + func (this *SocksServer) Listen(port v2net.Port) error { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ IP: []byte{0, 0, 0, 0}, @@ -46,17 +70,23 @@ func (this *SocksServer) Listen(port v2net.Port) error { return err } this.accepting = true - go this.AcceptConnections(listener) + this.tcpListener = listener + go this.AcceptConnections() if this.config.UDPEnabled() { this.ListenUDP(port) } return nil } -func (this *SocksServer) AcceptConnections(listener *net.TCPListener) { +func (this *SocksServer) AcceptConnections() { for this.accepting { retry.Timed(100 /* times */, 100 /* ms */).On(func() error { - connection, err := listener.AcceptTCP() + this.RLock() + defer this.RUnlock() + if !this.accepting { + return nil + } + connection, err := this.tcpListener.AcceptTCP() if err != nil { log.Error("Socks failed to accept new connection %v", err) return err @@ -64,7 +94,6 @@ func (this *SocksServer) AcceptConnections(listener *net.TCPListener) { go this.HandleConnection(connection) return nil }) - } } @@ -187,7 +216,7 @@ func (this *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writer response := protocol.NewSocks5Response() response.Error = protocol.ErrorSuccess - udpAddr := this.getUDPAddr() + udpAddr := this.udpAddress response.Port = udpAddr.Port() switch { diff --git a/proxy/socks/udp.go b/proxy/socks/udp.go index 9a8c8a112..3d2dce3e5 100644 --- a/proxy/socks/udp.go +++ b/proxy/socks/udp.go @@ -9,8 +9,6 @@ import ( "github.com/v2ray/v2ray-core/proxy/socks/protocol" ) -var udpAddress v2net.Destination - func (this *SocksServer) ListenUDP(port v2net.Port) error { addr := &net.UDPAddr{ IP: net.IP{0, 0, 0, 0}, @@ -22,20 +20,23 @@ func (this *SocksServer) ListenUDP(port v2net.Port) error { log.Error("Socks failed to listen UDP on port %d: %v", port, err) return err } - udpAddress = v2net.UDPDestination(v2net.IPAddress(this.config.IP()), port) + this.udpAddress = v2net.UDPDestination(v2net.IPAddress(this.config.IP()), port) + this.udpConn = conn - go this.AcceptPackets(conn) + go this.AcceptPackets() return nil } -func (this *SocksServer) getUDPAddr() v2net.Destination { - return udpAddress -} - -func (this *SocksServer) AcceptPackets(conn *net.UDPConn) error { - for { +func (this *SocksServer) AcceptPackets() error { + for this.accepting { buffer := alloc.NewBuffer() - nBytes, addr, err := conn.ReadFromUDP(buffer.Value) + this.RLock() + if !this.accepting { + this.RUnlock() + return nil + } + nBytes, addr, err := this.udpConn.ReadFromUDP(buffer.Value) + this.RUnlock() if err != nil { log.Error("Socks failed to read UDP packets: %v", err) buffer.Release() @@ -60,11 +61,12 @@ func (this *SocksServer) AcceptPackets(conn *net.UDPConn) error { udpPacket := v2net.NewPacket(request.Destination(), request.Data, false) log.Info("Send packet to %s with %d bytes", udpPacket.Destination().String(), request.Data.Len()) - go this.handlePacket(conn, udpPacket, addr, request.Address, request.Port) + go this.handlePacket(udpPacket, addr, request.Address, request.Port) } + return nil } -func (this *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet, clientAddr *net.UDPAddr, targetAddr v2net.Address, port v2net.Port) { +func (this *SocksServer) handlePacket(packet v2net.Packet, clientAddr *net.UDPAddr, targetAddr v2net.Address, port v2net.Port) { ray := this.space.PacketDispatcher().DispatchToOutbound(packet) close(ray.InboundInput()) @@ -80,7 +82,13 @@ func (this *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet, cl udpMessage := alloc.NewSmallBuffer().Clear() response.Write(udpMessage) - nBytes, err := conn.WriteToUDP(udpMessage.Value, clientAddr) + this.RLock() + if !this.accepting { + this.RUnlock() + return + } + nBytes, err := this.udpConn.WriteToUDP(udpMessage.Value, clientAddr) + this.RUnlock() udpMessage.Release() response.Data.Release() if err != nil { diff --git a/proxy/testing/mocks/inboundhandler.go b/proxy/testing/mocks/inboundhandler.go index 9c437704e..d38b87843 100644 --- a/proxy/testing/mocks/inboundhandler.go +++ b/proxy/testing/mocks/inboundhandler.go @@ -20,6 +20,10 @@ func (this *InboundConnectionHandler) Listen(port v2net.Port) error { return nil } +func (this *InboundConnectionHandler) Close() { + +} + func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error { ray := this.Space.PacketDispatcher().DispatchToOutbound(packet) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 70b1712e8..36c40fba4 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -21,9 +21,11 @@ import ( // Inbound connection handler that handles messages in VMess format. type VMessInboundHandler struct { + sync.Mutex space app.Space clients user.UserSet accepting bool + listener *net.TCPListener } func NewVMessInboundHandler(space app.Space, clients user.UserSet) *VMessInboundHandler { @@ -33,6 +35,18 @@ func NewVMessInboundHandler(space app.Space, clients user.UserSet) *VMessInbound } } +func (this *VMessInboundHandler) Close() { + this.accepting = false + if this.listener != nil { + this.Lock() + if this.listener != nil { + this.listener.Close() + this.listener = nil + } + this.Unlock() + } +} + func (this *VMessInboundHandler) Listen(port v2net.Port) error { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ IP: []byte{0, 0, 0, 0}, @@ -44,19 +58,27 @@ func (this *VMessInboundHandler) Listen(port v2net.Port) error { return err } this.accepting = true - go this.AcceptConnections(listener) + this.listener = listener + go this.AcceptConnections() return nil } -func (this *VMessInboundHandler) AcceptConnections(listener *net.TCPListener) error { +func (this *VMessInboundHandler) AcceptConnections() error { for this.accepting { retry.Timed(100 /* times */, 100 /* ms */).On(func() error { - connection, err := listener.AcceptTCP() - if err != nil { - log.Error("Failed to accpet connection: %s", err.Error()) - return err + if !this.accepting { + return nil + } + this.Lock() + defer this.Unlock() + if this.listener != nil { + connection, err := this.listener.AcceptTCP() + if err != nil { + log.Error("Failed to accpet connection: %s", err.Error()) + return err + } + go this.HandleConnection(connection) } - go this.HandleConnection(connection) return nil })