diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 96041dc8b..aa173753a 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -9,8 +9,8 @@ import ( "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport/listener" ) type DokodemoDoor struct { @@ -21,7 +21,7 @@ type DokodemoDoor struct { address v2net.Address port v2net.Port space app.Space - tcpListener *net.TCPListener + tcpListener *listener.TCPListener udpConn *net.UDPConn listeningPort v2net.Port } @@ -42,8 +42,8 @@ func (this *DokodemoDoor) Port() v2net.Port { func (this *DokodemoDoor) Close() { this.accepting = false if this.tcpListener != nil { - this.tcpListener.Close() this.tcpMutex.Lock() + this.tcpListener.Close() this.tcpListener = nil this.tcpMutex.Unlock() } @@ -132,42 +132,18 @@ func (this *DokodemoDoor) handleUDPPackets() { } func (this *DokodemoDoor) ListenTCP(port v2net.Port) error { - tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{ - IP: []byte{0, 0, 0, 0}, - Port: int(port), - Zone: "", - }) + tcpListener, err := listener.ListenTCP(port, this.HandleTCPConnection) if err != nil { - log.Error("Dokodemo failed to listen on port ", port, ": ", err) + log.Error("Dokodemo: Failed to listen on port ", port, ": ", err) return err } this.tcpMutex.Lock() this.tcpListener = tcpListener this.tcpMutex.Unlock() - go this.AcceptTCPConnections() return nil } -func (this *DokodemoDoor) AcceptTCPConnections() { - for this.accepting { - retry.Timed(100, 100).On(func() error { - this.tcpMutex.RLock() - defer this.tcpMutex.RUnlock() - if !this.accepting { - return nil - } - connection, err := this.tcpListener.AcceptTCP() - if err != nil { - log.Error("Dokodemo failed to accept new connections: ", err) - return err - } - go this.HandleTCPConnection(connection) - return nil - }) - } -} - -func (this *DokodemoDoor) HandleTCPConnection(conn *net.TCPConn) { +func (this *DokodemoDoor) HandleTCPConnection(conn *listener.TCPConn) { defer conn.Close() packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true) diff --git a/proxy/http/http.go b/proxy/http/http.go index 79427be71..1e14f83d2 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -13,9 +13,9 @@ import ( "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/common/serial" "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport/listener" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -24,7 +24,7 @@ type HttpProxyServer struct { accepting bool space app.Space config *Config - tcpListener *net.TCPListener + tcpListener *listener.TCPListener listeningPort v2net.Port } @@ -42,8 +42,8 @@ func (this *HttpProxyServer) Port() v2net.Port { func (this *HttpProxyServer) Close() { this.accepting = false if this.tcpListener != nil { - this.tcpListener.Close() this.Lock() + this.tcpListener.Close() this.tcpListener = nil this.Unlock() } @@ -59,40 +59,18 @@ func (this *HttpProxyServer) Listen(port v2net.Port) error { } this.listeningPort = port - tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{ - Port: int(port.Value()), - IP: []byte{0, 0, 0, 0}, - }) + tcpListener, err := listener.ListenTCP(port, this.handleConnection) if err != nil { + log.Error("Http: Failed listen on port ", port, ": ", err) return err } this.Lock() this.tcpListener = tcpListener this.Unlock() this.accepting = true - go this.accept() return nil } -func (this *HttpProxyServer) accept() { - for this.accepting { - retry.Timed(100 /* times */, 100 /* ms */).On(func() error { - this.Lock() - defer this.Unlock() - if !this.accepting { - return nil - } - tcpConn, err := this.tcpListener.AcceptTCP() - if err != nil { - log.Error("Failed to accept HTTP connection: ", err) - return err - } - go this.handleConnection(tcpConn) - return nil - }) - } -} - func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error) { port := defaultPort host, rawPort, err := net.SplitHostPort(rawHost) @@ -116,7 +94,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error return v2net.TCPDestination(v2net.DomainAddress(host), port), nil } -func (this *HttpProxyServer) handleConnection(conn *net.TCPConn) { +func (this *HttpProxyServer) handleConnection(conn *listener.TCPConn) { defer conn.Close() reader := bufio.NewReader(conn) diff --git a/proxy/shadowsocks/config.go b/proxy/shadowsocks/config.go index 2a876c387..f15b1dce6 100644 --- a/proxy/shadowsocks/config.go +++ b/proxy/shadowsocks/config.go @@ -46,4 +46,5 @@ func (this *AesCfb) NewDecodingStream(key []byte, iv []byte, reader io.Reader) ( type Config struct { Cipher Cipher Password string + UDP bool } diff --git a/proxy/shadowsocks/config_json.go b/proxy/shadowsocks/config_json.go index 6b5799aca..4a2ed17c5 100644 --- a/proxy/shadowsocks/config_json.go +++ b/proxy/shadowsocks/config_json.go @@ -14,6 +14,7 @@ func (this *Config) UnmarshalJSON(data []byte) error { type JsonConfig struct { Cipher serial.StringLiteral `json:"method"` Password serial.StringLiteral `json:"password"` + UDP bool `json:"udp"` } jsonConfig := new(JsonConfig) if err := json.Unmarshal(data, jsonConfig); err != nil { @@ -23,6 +24,7 @@ func (this *Config) UnmarshalJSON(data []byte) error { log.Error("Shadowsocks: Password is not specified.") return internal.ErrorBadConfiguration } + this.UDP = jsonConfig.UDP this.Password = jsonConfig.Password.String() if this.Cipher == nil { log.Error("Shadowsocks: Cipher method is not specified.") diff --git a/proxy/shadowsocks/shadowsocks.go b/proxy/shadowsocks/shadowsocks.go index 666346e03..e8eef6d7a 100644 --- a/proxy/shadowsocks/shadowsocks.go +++ b/proxy/shadowsocks/shadowsocks.go @@ -3,18 +3,49 @@ package shadowsocks import ( + "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport/listener" ) type Shadowsocks struct { - config *Config - port v2net.Port + config *Config + port v2net.Port + accepting bool + tcpListener *listener.TCPListener } func (this *Shadowsocks) Port() v2net.Port { return this.port } +func (this *Shadowsocks) Close() { + this.accepting = false + this.tcpListener.Close() + this.tcpListener = nil +} + func (this *Shadowsocks) Listen(port v2net.Port) error { + if this.accepting { + if this.port == port { + return nil + } else { + return proxy.ErrorAlreadyListening + } + } + + tcpListener, err := listener.ListenTCP(port, this.handleConnection) + if err != nil { + log.Error("Shadowsocks: Failed to listen on port ", port, ": ", err) + return err + } + this.tcpListener = tcpListener + this.accepting = true return nil } + +func (this *Shadowsocks) handleConnection(conn *listener.TCPConn) { + defer conn.Close() + +} diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index 92251b3bf..66d66942f 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -11,9 +11,9 @@ import ( "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/socks/protocol" + "github.com/v2ray/v2ray-core/transport/listener" ) var ( @@ -28,7 +28,7 @@ type SocksServer struct { accepting bool space app.Space config *Config - tcpListener *net.TCPListener + tcpListener *listener.TCPListener udpConn *net.UDPConn udpAddress v2net.Destination listeningPort v2net.Port @@ -48,8 +48,8 @@ func (this *SocksServer) Port() v2net.Port { func (this *SocksServer) Close() { this.accepting = false if this.tcpListener != nil { - this.tcpListener.Close() this.tcpMutex.Lock() + this.tcpListener.Close() this.tcpListener = nil this.tcpMutex.Unlock() } @@ -71,11 +71,7 @@ func (this *SocksServer) Listen(port v2net.Port) error { } this.listeningPort = port - listener, err := net.ListenTCP("tcp", &net.TCPAddr{ - IP: []byte{0, 0, 0, 0}, - Port: int(port), - Zone: "", - }) + listener, err := listener.ListenTCP(port, this.HandleConnection) if err != nil { log.Error("Socks: failed to listen on port ", port, ": ", err) return err @@ -84,33 +80,13 @@ func (this *SocksServer) Listen(port v2net.Port) error { this.tcpMutex.Lock() this.tcpListener = listener this.tcpMutex.Unlock() - go this.AcceptConnections() if this.config.UDPEnabled { this.ListenUDP(port) } return nil } -func (this *SocksServer) AcceptConnections() { - for this.accepting { - retry.Timed(100 /* times */, 100 /* ms */).On(func() error { - this.tcpMutex.RLock() - defer this.tcpMutex.RUnlock() - if !this.accepting { - return nil - } - connection, err := this.tcpListener.AcceptTCP() - if err != nil { - log.Error("Socks: failed to accept new connection: ", err) - return err - } - go this.HandleConnection(connection) - return nil - }) - } -} - -func (this *SocksServer) HandleConnection(connection *net.TCPConn) error { +func (this *SocksServer) HandleConnection(connection *listener.TCPConn) { defer connection.Close() reader := v2net.NewTimeOutReader(120, connection) @@ -118,13 +94,13 @@ func (this *SocksServer) HandleConnection(connection *net.TCPConn) error { auth, auth4, err := protocol.ReadAuthentication(reader) if err != nil && err != protocol.Socks4Downgrade { log.Error("Socks: failed to read authentication: ", err) - return err + return } if err != nil && err == protocol.Socks4Downgrade { - return this.handleSocks4(reader, connection, auth4) + this.handleSocks4(reader, connection, auth4) } else { - return this.handleSocks5(reader, connection, auth) + this.handleSocks5(reader, connection, auth) } } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index bbcc6a1ae..103536527 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -3,7 +3,6 @@ package inbound import ( "crypto/md5" "io" - "net" "sync" "github.com/v2ray/v2ray-core/app" @@ -11,12 +10,12 @@ import ( v2crypto "github.com/v2ray/v2ray-core/common/crypto" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/common/serial" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" "github.com/v2ray/v2ray-core/proxy/vmess" "github.com/v2ray/v2ray-core/proxy/vmess/protocol" + "github.com/v2ray/v2ray-core/transport/listener" ) // Inbound connection handler that handles messages in VMess format. @@ -26,7 +25,7 @@ type VMessInboundHandler struct { clients protocol.UserSet user *vmess.User accepting bool - listener *net.TCPListener + listener *listener.TCPListener features *FeaturesConfig listeningPort v2net.Port } @@ -38,8 +37,8 @@ func (this *VMessInboundHandler) Port() v2net.Port { func (this *VMessInboundHandler) Close() { this.accepting = false if this.listener != nil { - this.listener.Close() this.Lock() + this.listener.Close() this.listener = nil this.Unlock() } @@ -59,45 +58,19 @@ func (this *VMessInboundHandler) Listen(port v2net.Port) error { } this.listeningPort = port - listener, err := net.ListenTCP("tcp", &net.TCPAddr{ - IP: []byte{0, 0, 0, 0}, - Port: int(port), - Zone: "", - }) + tcpListener, err := listener.ListenTCP(port, this.HandleConnection) if err != nil { log.Error("Unable to listen tcp port ", port, ": ", err) return err } this.accepting = true this.Lock() - this.listener = listener + this.listener = tcpListener this.Unlock() - go this.AcceptConnections() return nil } -func (this *VMessInboundHandler) AcceptConnections() error { - for this.accepting { - retry.Timed(100 /* times */, 100 /* ms */).On(func() error { - this.Lock() - defer this.Unlock() - if !this.accepting { - return nil - } - connection, err := this.listener.AcceptTCP() - if err != nil { - log.Error("Failed to accpet connection: ", err) - return err - } - go this.HandleConnection(connection) - return nil - }) - - } - return nil -} - -func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error { +func (this *VMessInboundHandler) HandleConnection(connection *listener.TCPConn) { defer connection.Close() connReader := v2net.NewTimeOutReader(16, connection) @@ -107,7 +80,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error if err != nil { log.Access(connection.RemoteAddr(), serial.StringLiteral(""), log.AccessRejected, serial.StringLiteral(err.Error())) log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err) - return err + return } log.Access(connection.RemoteAddr(), request.Address, log.AccessAccepted, serial.StringLiteral("")) log.Debug("VMessIn: Received request for ", request.Address) @@ -130,7 +103,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error if err != nil { log.Error("VMessIn: Failed to create AES decryption stream: ", err) close(input) - return err + return } responseWriter := v2crypto.NewCryptionWriter(aesStream, connection) @@ -151,8 +124,6 @@ func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error connection.CloseWrite() readFinish.Lock() - - return nil } func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) { diff --git a/transport/listener/tcp.go b/transport/listener/tcp.go new file mode 100644 index 000000000..bf1566608 --- /dev/null +++ b/transport/listener/tcp.go @@ -0,0 +1,110 @@ +package listener + +import ( + "net" + "time" + + "github.com/v2ray/v2ray-core/common/log" + v2net "github.com/v2ray/v2ray-core/common/net" +) + +type TCPConn struct { + conn *net.TCPConn + listener *TCPListener + dirty bool +} + +func (this *TCPConn) Read(b []byte) (int, error) { + return this.conn.Read(b) +} + +func (this *TCPConn) Write(b []byte) (int, error) { + return this.conn.Write(b) +} + +func (this *TCPConn) Close() error { + return this.conn.Close() +} + +func (this *TCPConn) Release() { + if this.dirty { + this.Close() + return + } + this.listener.recycle(this.conn) +} + +func (this *TCPConn) LocalAddr() net.Addr { + return this.conn.LocalAddr() +} + +func (this *TCPConn) RemoteAddr() net.Addr { + return this.conn.RemoteAddr() +} + +func (this *TCPConn) SetDeadline(t time.Time) error { + return this.conn.SetDeadline(t) +} + +func (this *TCPConn) SetReadDeadline(t time.Time) error { + return this.conn.SetReadDeadline(t) +} + +func (this *TCPConn) SetWriteDeadline(t time.Time) error { + return this.conn.SetWriteDeadline(t) +} + +func (this *TCPConn) CloseRead() error { + return this.conn.CloseRead() +} + +func (this *TCPConn) CloseWrite() error { + return this.conn.CloseWrite() +} + +type TCPListener struct { + listener *net.TCPListener + connCallback func(*TCPConn) + accepting bool +} + +func ListenTCP(port v2net.Port, callback func(*TCPConn)) (*TCPListener, error) { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: int(port), + Zone: "", + }) + if err != nil { + return nil, err + } + tcpListener := &TCPListener{ + listener: listener, + connCallback: callback, + } + go tcpListener.start() + return tcpListener, nil +} + +func (this *TCPListener) Close() { + this.accepting = false + this.listener.Close() +} + +func (this *TCPListener) start() { + this.accepting = true + for this.accepting { + conn, err := this.listener.AcceptTCP() + if err != nil { + log.Warning("Listener: Failed to accept new TCP connection: ", err) + continue + } + go this.connCallback(&TCPConn{ + conn: conn, + listener: this, + }) + } +} + +func (this *TCPListener) recycle(conn *net.TCPConn) { + +}