diff --git a/common/net/timed_io.go b/common/net/timed_io.go deleted file mode 100644 index cfd356e96..000000000 --- a/common/net/timed_io.go +++ /dev/null @@ -1,72 +0,0 @@ -package net - -import ( - "io" - "net" - "time" -) - -var ( - emptyTime time.Time -) - -type TimeOutReader struct { - timeout uint32 - connection net.Conn - worker io.Reader -} - -func NewTimeOutReader(timeout uint32 /* seconds */, connection net.Conn) *TimeOutReader { - reader := &TimeOutReader{ - connection: connection, - timeout: 0, - } - reader.SetTimeOut(timeout) - return reader -} - -func (reader *TimeOutReader) Read(p []byte) (int, error) { - return reader.worker.Read(p) -} - -func (reader *TimeOutReader) GetTimeOut() uint32 { - return reader.timeout -} - -func (reader *TimeOutReader) SetTimeOut(value uint32) { - if reader.worker != nil && value == reader.timeout { - return - } - reader.timeout = value - if value > 0 { - reader.worker = &timedReaderWorker{ - timeout: value, - connection: reader.connection, - } - } else { - reader.worker = &noOpReaderWorker{ - connection: reader.connection, - } - } -} - -type timedReaderWorker struct { - timeout uint32 - connection net.Conn -} - -func (v *timedReaderWorker) Read(p []byte) (int, error) { - deadline := time.Duration(v.timeout) * time.Second - v.connection.SetReadDeadline(time.Now().Add(deadline)) - nBytes, err := v.connection.Read(p) - v.connection.SetReadDeadline(emptyTime) - return nBytes, err -} - -type noOpReaderWorker struct { - connection net.Conn -} - -func (v *noOpReaderWorker) Read(p []byte) (int, error) { - return v.connection.Read(p) -} diff --git a/common/net/timed_io_test.go b/common/net/timed_io_test.go deleted file mode 100644 index 7a5162f44..000000000 --- a/common/net/timed_io_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package net_test - -import ( - "testing" - - . "v2ray.com/core/common/net" - "v2ray.com/core/testing/assert" -) - -func TestTimeOutSettings(t *testing.T) { - assert := assert.On(t) - - reader := NewTimeOutReader(8, nil) - assert.Uint32(reader.GetTimeOut()).Equals(8) - reader.SetTimeOut(8) // no op - assert.Uint32(reader.GetTimeOut()).Equals(8) - reader.SetTimeOut(9) - assert.Uint32(reader.GetTimeOut()).Equals(9) -} diff --git a/common/protocol/user.go b/common/protocol/user.go index f2e09659c..9b34157a1 100644 --- a/common/protocol/user.go +++ b/common/protocol/user.go @@ -1,6 +1,8 @@ package protocol import ( + "time" + "v2ray.com/core/common/errors" ) @@ -30,15 +32,18 @@ func (v *User) GetTypedAccount() (Account, error) { } func (v *User) GetSettings() UserSettings { - settings := UserSettings{ - PayloadReadTimeout: 120, - } - if v.Level > 0 { - settings.PayloadReadTimeout = 0 + settings := UserSettings{} + switch v.Level { + case 0: + settings.PayloadTimeout = time.Second * 30 + case 1: + settings.PayloadTimeout = time.Minute * 2 + default: + settings.PayloadTimeout = time.Minute * 5 } return settings } type UserSettings struct { - PayloadReadTimeout uint32 + PayloadTimeout time.Duration } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 196869def..1c290c228 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -70,15 +70,18 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in } ctx = proxy.ContextWithDestination(ctx, dest) ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + timeout := time.Second * time.Duration(d.config.Timeout) + if timeout == 0 { + timeout = time.Minute * 2 + } + timer := signal.CancelAfterInactivity(ctx, cancel, timeout) inboundRay := d.packetDispatcher.DispatchToOutbound(ctx) requestDone := signal.ExecuteAsync(func() error { defer inboundRay.InboundInput().Close() - timedReader := net.NewTimeOutReader(d.config.Timeout, conn) - chunkReader := buf.NewReader(timedReader) + chunkReader := buf.NewReader(conn) if err := buf.PipeUntilEOF(timer, chunkReader, inboundRay.InboundInput()); err != nil { log.Info("Dokodemo: Failed to transport request: ", err) diff --git a/proxy/http/server.go b/proxy/http/server.go index 4851ff2b2..9a6577f3b 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -82,8 +82,8 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection) error { conn.SetReusable(false) - timedReader := v2net.NewTimeOutReader(s.config.Timeout, conn) - reader := bufio.OriginalReaderSize(timedReader, 2048) + conn.SetReadDeadline(time.Now().Add(time.Second * 8)) + reader := bufio.OriginalReaderSize(conn, 2048) request, err := http.ReadRequest(reader) if err != nil { @@ -93,6 +93,8 @@ func (s *Server) Process(ctx context.Context, network v2net.Network, conn intern return err } log.Info("HTTP: Request to Method [", request.Method, "] Host [", request.Host, "] with URL [", request.URL, "]") + conn.SetReadDeadline(time.Time{}) + defaultPort := v2net.Port(80) if strings.ToLower(request.URL.Scheme) == "https" { defaultPort = v2net.Port(443) @@ -133,7 +135,11 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade } ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + timeout := time.Second * time.Duration(s.config.Timeout) + if timeout == 0 { + timeout = time.Minute * 2 + } + timer := signal.CancelAfterInactivity(ctx, cancel, timeout) ray := s.packetDispatcher.DispatchToOutbound(ctx) requestDone := signal.ExecuteAsync(func() error { diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 9195d8358..32fd16351 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -137,20 +137,18 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection } func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) error { - timedReader := net.NewTimeOutReader(16, conn) - bufferedReader := bufio.NewReader(timedReader) + conn.SetReadDeadline(time.Now().Add(time.Second * 8)) + bufferedReader := bufio.NewReader(conn) request, bodyReader, err := ReadTCPSession(s.user, bufferedReader) if err != nil { log.Access(conn.RemoteAddr(), "", log.AccessRejected, err) log.Info("Shadowsocks|Server: Failed to create request from: ", conn.RemoteAddr(), ": ", err) return err } + conn.SetReadDeadline(time.Time{}) bufferedReader.SetBuffered(false) - userSettings := s.user.GetSettings() - timedReader.SetTimeOut(userSettings.PayloadReadTimeout) - dest := request.Destination() log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "") log.Info("Shadowsocks|Server: Tunnelling request to ", dest) @@ -159,7 +157,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) ctx = protocol.ContextWithUser(ctx, request.User) ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + userSettings := s.user.GetSettings() + timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout) ray := s.packetDispatcher.DispatchToOutbound(ctx) requestDone := signal.ExecuteAsync(func() error { diff --git a/proxy/socks/client.go b/proxy/socks/client.go index eefa2d8f0..7a850277b 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -107,7 +107,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error { } responseFunc = func() error { defer ray.OutboundOutput().Close() - reader := &UDPReader{reader: net.NewTimeOutReader(16, udpConn)} + reader := &UDPReader{reader: udpConn} return buf.PipeUntilEOF(timer, reader, ray.OutboundOutput()) } } diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 23df22a3f..417931fc6 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -72,8 +72,8 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet } func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error { - timedReader := net.NewTimeOutReader(16 /* seconds, for handshake */, conn) - reader := bufio.NewReader(timedReader) + conn.SetReadDeadline(time.Now().Add(time.Second * 8)) + reader := bufio.NewReader(conn) inboundDest := proxy.InboundDestinationFromContext(ctx) session := &ServerSession{ @@ -88,13 +88,13 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error log.Info("Socks|Server: Failed to read request: ", err) return err } + conn.SetReadDeadline(time.Time{}) if request.Command == protocol.RequestCommandTCP { dest := request.Destination() log.Info("Socks|Server: TCP Connect request to ", dest) log.Access(source, dest, log.AccessAccepted, "") - timedReader.SetTimeOut(s.config.Timeout) ctx = proxy.ContextWithDestination(ctx, dest) return s.transport(ctx, reader, conn) } @@ -117,7 +117,11 @@ func (*Server) handleUDP() error { func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer) error { ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + timeout := time.Second * time.Duration(v.config.Timeout) + if timeout == 0 { + timeout = time.Minute * 2 + } + timer := signal.CancelAfterInactivity(ctx, cancel, timeout) ray := v.packetDispatcher.DispatchToOutbound(ctx) input := ray.InboundInput() diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index f220e823b..d6d50613f 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -178,8 +178,8 @@ func transferResponse(timer *signal.ActivityTimer, session *encoding.ServerSessi } func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection internet.Connection) error { - connReader := net.NewTimeOutReader(8, connection) - reader := bufio.NewReader(connReader) + connection.SetReadDeadline(time.Now().Add(time.Second * 8)) + reader := bufio.NewReader(connection) session := encoding.NewServerSession(v.clients) request, err := session.DecodeRequestHeader(reader) @@ -195,19 +195,20 @@ func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "") log.Info("VMess|Inbound: Received request for ", request.Destination()) + connection.SetReadDeadline(time.Time{}) + connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse)) + userSettings := request.User.GetSettings() ctx = proxy.ContextWithDestination(ctx, request.Destination()) ctx = protocol.ContextWithUser(ctx, request.User) ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout) ray := v.packetDispatcher.DispatchToOutbound(ctx) input := ray.InboundInput() output := ray.InboundOutput() - userSettings := request.User.GetSettings() - connReader.SetTimeOut(userSettings.PayloadReadTimeout) reader.SetBuffered(false) requestDone := signal.ExecuteAsync(func() error {