From 4046ee968c6aea6a385f435493a72f3eb14d9f3f Mon Sep 17 00:00:00 2001 From: V2Ray Date: Fri, 27 Nov 2015 21:50:28 +0100 Subject: [PATCH] refactor code --- proxy/freedom/freedom.go | 55 ++++++++++++------------- proxy/freedom/freedomfactory.go | 2 +- proxy/socks/socks.go | 71 ++++++++++++++++----------------- 3 files changed, 60 insertions(+), 68 deletions(-) diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 2276afc0f..4080440b4 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -4,7 +4,6 @@ import ( "net" "sync" - "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport/ray" @@ -17,7 +16,7 @@ func NewFreedomConnection() *FreedomConnection { return &FreedomConnection{} } -func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { +func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { conn, err := net.Dial(firstPacket.Destination().Network(), firstPacket.Destination().Address().String()) log.Info("Freedom: Opening connection to %s", firstPacket.Destination().String()) if err != nil { @@ -40,10 +39,32 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbo if !firstPacket.MoreChunks() { writeMutex.Unlock() } else { - go dumpInput(conn, input, &writeMutex) + go func() { + v2net.ChanToWriter(conn, input) + writeMutex.Unlock() + }() } - go dumpOutput(conn, output, &readMutex, firstPacket.Destination().IsUDP()) + go func() { + defer readMutex.Unlock() + defer close(output) + + response, err := v2net.ReadFrom(conn, nil) + log.Info("Freedom receives %d bytes from %s", response.Len(), conn.RemoteAddr().String()) + if response.Len() > 0 { + output <- response + } else { + response.Release() + } + if err != nil { + return + } + if firstPacket.Destination().IsUDP() { + return + } + + v2net.ReaderToChan(output, conn) + }() writeMutex.Lock() if tcpConn, ok := conn.(*net.TCPConn); ok { @@ -54,29 +75,3 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbo return nil } - -func dumpInput(conn net.Conn, input <-chan *alloc.Buffer, finish *sync.Mutex) { - v2net.ChanToWriter(conn, input) - finish.Unlock() -} - -func dumpOutput(conn net.Conn, output chan<- *alloc.Buffer, finish *sync.Mutex, udp bool) { - defer finish.Unlock() - defer close(output) - - response, err := v2net.ReadFrom(conn, nil) - log.Info("Freedom receives %d bytes from %s", response.Len(), conn.RemoteAddr().String()) - if response.Len() > 0 { - output <- response - } else { - response.Release() - } - if err != nil { - return - } - if udp { - return - } - - v2net.ReaderToChan(output, conn) -} diff --git a/proxy/freedom/freedomfactory.go b/proxy/freedom/freedomfactory.go index 96eef339d..604dac504 100644 --- a/proxy/freedom/freedomfactory.go +++ b/proxy/freedom/freedomfactory.go @@ -7,7 +7,7 @@ import ( type FreedomFactory struct { } -func (factory FreedomFactory) Create(config interface{}) (connhandler.OutboundConnectionHandler, error) { +func (this FreedomFactory) Create(config interface{}) (connhandler.OutboundConnectionHandler, error) { return NewFreedomConnection(), nil } diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index b70785b31..29d54f333 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -36,7 +36,7 @@ func NewSocksServer(dispatcher app.PacketDispatcher, config *jsonconfig.SocksCon } } -func (server *SocksServer) Listen(port uint16) error { +func (this *SocksServer) Listen(port uint16) error { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ IP: []byte{0, 0, 0, 0}, Port: int(port), @@ -46,30 +46,30 @@ func (server *SocksServer) Listen(port uint16) error { log.Error("Socks failed to listen on port %d: %v", port, err) return err } - server.accepting = true - go server.AcceptConnections(listener) - if server.config.UDPEnabled { - server.ListenUDP(port) + this.accepting = true + go this.AcceptConnections(listener) + if this.config.UDPEnabled { + this.ListenUDP(port) } return nil } -func (server *SocksServer) AcceptConnections(listener *net.TCPListener) { - for server.accepting { +func (this *SocksServer) AcceptConnections(listener *net.TCPListener) { + for this.accepting { retry.Timed(100 /* times */, 100 /* ms */).On(func() error { connection, err := listener.AcceptTCP() if err != nil { log.Error("Socks failed to accept new connection %v", err) return err } - go server.HandleConnection(connection) + go this.HandleConnection(connection) return nil }) } } -func (server *SocksServer) HandleConnection(connection *net.TCPConn) error { +func (this *SocksServer) HandleConnection(connection *net.TCPConn) error { defer connection.Close() reader := v2net.NewTimeOutReader(120, connection) @@ -81,15 +81,15 @@ func (server *SocksServer) HandleConnection(connection *net.TCPConn) error { } if err != nil && err == protocol.Socks4Downgrade { - return server.handleSocks4(reader, connection, auth4) + return this.handleSocks4(reader, connection, auth4) } else { - return server.handleSocks5(reader, connection, auth) + return this.handleSocks5(reader, connection, auth) } } -func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.Writer, auth protocol.Socks5AuthenticationRequest) error { +func (this *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.Writer, auth protocol.Socks5AuthenticationRequest) error { expectedAuthMethod := protocol.AuthNotRequired - if server.config.IsPassword() { + if this.config.IsPassword() { expectedAuthMethod = protocol.AuthUserPass } @@ -110,14 +110,14 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W log.Error("Socks failed to write authentication: %v", err) return err } - if server.config.IsPassword() { + if this.config.IsPassword() { upRequest, err := protocol.ReadUserPassRequest(reader) if err != nil { log.Error("Socks failed to read username and password: %v", err) return err } status := byte(0) - if !server.config.HasAccount(upRequest.Username(), upRequest.Password()) { + if !this.config.HasAccount(upRequest.Username(), upRequest.Password()) { status = byte(0xFF) } upResponse := protocol.NewSocks5UserPassResponse(status) @@ -138,8 +138,8 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W return err } - if request.Command == protocol.CmdUdpAssociate && server.config.UDPEnabled { - return server.handleUDP(reader, writer) + if request.Command == protocol.CmdUdpAssociate && this.config.UDPEnabled { + return this.handleUDP(reader, writer) } if request.Command == protocol.CmdBind || request.Command == protocol.CmdUdpAssociate { @@ -183,15 +183,15 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W } packet := v2net.NewPacket(dest, data, true) - server.transport(reader, writer, packet) + this.transport(reader, writer, packet) return nil } -func (server *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writer) error { +func (this *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writer) error { response := protocol.NewSocks5Response() response.Error = protocol.ErrorSuccess - udpAddr := server.getUDPAddr() + udpAddr := this.getUDPAddr() response.Port = udpAddr.Port() switch { @@ -223,7 +223,7 @@ func (server *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writ return nil } -func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth protocol.Socks4AuthenticationRequest) error { +func (this *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth protocol.Socks4AuthenticationRequest) error { result := protocol.Socks4RequestGranted if auth.Command == protocol.CmdBind { result = protocol.Socks4RequestRejected @@ -247,12 +247,12 @@ func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth } packet := v2net.NewPacket(dest, data, true) - server.transport(reader, writer, packet) + this.transport(reader, writer, packet) return nil } -func (server *SocksServer) transport(reader io.Reader, writer io.Writer, firstPacket v2net.Packet) { - ray := server.dispatcher.DispatchToOutbound(firstPacket) +func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPacket v2net.Packet) { + ray := this.dispatcher.DispatchToOutbound(firstPacket) input := ray.InboundInput() output := ray.InboundOutput() @@ -260,18 +260,15 @@ func (server *SocksServer) transport(reader io.Reader, writer io.Writer, firstPa inputFinish.Lock() outputFinish.Lock() - go dumpInput(reader, input, &inputFinish) - go dumpOutput(writer, output, &outputFinish) + go func() { + v2net.ReaderToChan(input, reader) + inputFinish.Unlock() + close(input) + }() + + go func() { + v2net.ChanToWriter(writer, output) + outputFinish.Unlock() + }() outputFinish.Lock() } - -func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) { - v2net.ReaderToChan(input, reader) - finish.Unlock() - close(input) -} - -func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) { - v2net.ChanToWriter(writer, output) - finish.Unlock() -}