1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-11-12 07:19:01 -05:00

remove timeout reader

This commit is contained in:
Darien Raymond 2017-01-31 16:49:59 +01:00
parent df44a53ca0
commit ad7d98473f
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
8 changed files with 43 additions and 113 deletions

View File

@ -1,72 +0,0 @@
package net
import (
"io"
"net"
"time"
)
var (
emptyTime time.Time
)
type TimeOutReader struct {
timeout uint32
connection net.Conn
worker io.Reader
}
func NewTimeOutReader(timeout uint32 /* seconds */, connection net.Conn) *TimeOutReader {
reader := &TimeOutReader{
connection: connection,
timeout: 0,
}
reader.SetTimeOut(timeout)
return reader
}
func (reader *TimeOutReader) Read(p []byte) (int, error) {
return reader.worker.Read(p)
}
func (reader *TimeOutReader) GetTimeOut() uint32 {
return reader.timeout
}
func (reader *TimeOutReader) SetTimeOut(value uint32) {
if reader.worker != nil && value == reader.timeout {
return
}
reader.timeout = value
if value > 0 {
reader.worker = &timedReaderWorker{
timeout: value,
connection: reader.connection,
}
} else {
reader.worker = &noOpReaderWorker{
connection: reader.connection,
}
}
}
type timedReaderWorker struct {
timeout uint32
connection net.Conn
}
func (v *timedReaderWorker) Read(p []byte) (int, error) {
deadline := time.Duration(v.timeout) * time.Second
v.connection.SetReadDeadline(time.Now().Add(deadline))
nBytes, err := v.connection.Read(p)
v.connection.SetReadDeadline(emptyTime)
return nBytes, err
}
type noOpReaderWorker struct {
connection net.Conn
}
func (v *noOpReaderWorker) Read(p []byte) (int, error) {
return v.connection.Read(p)
}

View File

@ -1,19 +0,0 @@
package net_test
import (
"testing"
. "v2ray.com/core/common/net"
"v2ray.com/core/testing/assert"
)
func TestTimeOutSettings(t *testing.T) {
assert := assert.On(t)
reader := NewTimeOutReader(8, nil)
assert.Uint32(reader.GetTimeOut()).Equals(8)
reader.SetTimeOut(8) // no op
assert.Uint32(reader.GetTimeOut()).Equals(8)
reader.SetTimeOut(9)
assert.Uint32(reader.GetTimeOut()).Equals(9)
}

View File

@ -70,15 +70,18 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
}
ctx = proxy.ContextWithDestination(ctx, dest)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
timeout := time.Second * time.Duration(d.config.Timeout)
if timeout == 0 {
timeout = time.Minute * 2
}
timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
inboundRay := d.packetDispatcher.DispatchToOutbound(ctx)
requestDone := signal.ExecuteAsync(func() error {
defer inboundRay.InboundInput().Close()
timedReader := net.NewTimeOutReader(d.config.Timeout, conn)
chunkReader := buf.NewReader(timedReader)
chunkReader := buf.NewReader(conn)
if err := buf.PipeUntilEOF(timer, chunkReader, inboundRay.InboundInput()); err != nil {
log.Info("Dokodemo: Failed to transport request: ", err)

View File

@ -82,8 +82,8 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection) error {
conn.SetReusable(false)
timedReader := v2net.NewTimeOutReader(s.config.Timeout, conn)
reader := bufio.OriginalReaderSize(timedReader, 2048)
conn.SetReadDeadline(time.Now().Add(time.Second * 8))
reader := bufio.OriginalReaderSize(conn, 2048)
request, err := http.ReadRequest(reader)
if err != nil {
@ -93,6 +93,8 @@ func (s *Server) Process(ctx context.Context, network v2net.Network, conn intern
return err
}
log.Info("HTTP: Request to Method [", request.Method, "] Host [", request.Host, "] with URL [", request.URL, "]")
conn.SetReadDeadline(time.Time{})
defaultPort := v2net.Port(80)
if strings.ToLower(request.URL.Scheme) == "https" {
defaultPort = v2net.Port(443)
@ -133,7 +135,11 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
}
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
timeout := time.Second * time.Duration(s.config.Timeout)
if timeout == 0 {
timeout = time.Minute * 2
}
timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
ray := s.packetDispatcher.DispatchToOutbound(ctx)
requestDone := signal.ExecuteAsync(func() error {

View File

@ -137,20 +137,18 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
}
func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) error {
timedReader := net.NewTimeOutReader(16, conn)
bufferedReader := bufio.NewReader(timedReader)
conn.SetReadDeadline(time.Now().Add(time.Second * 8))
bufferedReader := bufio.NewReader(conn)
request, bodyReader, err := ReadTCPSession(s.user, bufferedReader)
if err != nil {
log.Access(conn.RemoteAddr(), "", log.AccessRejected, err)
log.Info("Shadowsocks|Server: Failed to create request from: ", conn.RemoteAddr(), ": ", err)
return err
}
conn.SetReadDeadline(time.Time{})
bufferedReader.SetBuffered(false)
userSettings := s.user.GetSettings()
timedReader.SetTimeOut(userSettings.PayloadReadTimeout)
dest := request.Destination()
log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "")
log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
@ -159,7 +157,12 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection)
ctx = protocol.ContextWithUser(ctx, request.User)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
userSettings := s.user.GetSettings()
timeout := time.Second * time.Duration(userSettings.PayloadReadTimeout)
if timeout == 0 {
timeout = time.Minute * 2
}
timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
ray := s.packetDispatcher.DispatchToOutbound(ctx)
requestDone := signal.ExecuteAsync(func() error {

View File

@ -107,7 +107,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error {
}
responseFunc = func() error {
defer ray.OutboundOutput().Close()
reader := &UDPReader{reader: net.NewTimeOutReader(16, udpConn)}
reader := &UDPReader{reader: udpConn}
return buf.PipeUntilEOF(timer, reader, ray.OutboundOutput())
}
}

View File

@ -72,8 +72,8 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
}
func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error {
timedReader := net.NewTimeOutReader(16 /* seconds, for handshake */, conn)
reader := bufio.NewReader(timedReader)
conn.SetReadDeadline(time.Now().Add(time.Second * 8))
reader := bufio.NewReader(conn)
inboundDest := proxy.InboundDestinationFromContext(ctx)
session := &ServerSession{
@ -88,13 +88,13 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error
log.Info("Socks|Server: Failed to read request: ", err)
return err
}
conn.SetReadDeadline(time.Time{})
if request.Command == protocol.RequestCommandTCP {
dest := request.Destination()
log.Info("Socks|Server: TCP Connect request to ", dest)
log.Access(source, dest, log.AccessAccepted, "")
timedReader.SetTimeOut(s.config.Timeout)
ctx = proxy.ContextWithDestination(ctx, dest)
return s.transport(ctx, reader, conn)
}
@ -117,7 +117,11 @@ func (*Server) handleUDP() error {
func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer) error {
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
timeout := time.Second * time.Duration(v.config.Timeout)
if timeout == 0 {
timeout = time.Minute * 2
}
timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
ray := v.packetDispatcher.DispatchToOutbound(ctx)
input := ray.InboundInput()

View File

@ -178,8 +178,8 @@ func transferResponse(timer *signal.ActivityTimer, session *encoding.ServerSessi
}
func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection internet.Connection) error {
connReader := net.NewTimeOutReader(8, connection)
reader := bufio.NewReader(connReader)
connection.SetReadDeadline(time.Now().Add(time.Second * 8))
reader := bufio.NewReader(connection)
session := encoding.NewServerSession(v.clients)
request, err := session.DecodeRequestHeader(reader)
@ -195,19 +195,24 @@ func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network,
log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
log.Info("VMess|Inbound: Received request for ", request.Destination())
connection.SetReadDeadline(time.Time{})
connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
userSettings := request.User.GetSettings()
ctx = proxy.ContextWithDestination(ctx, request.Destination())
ctx = protocol.ContextWithUser(ctx, request.User)
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
timeout := time.Second * time.Duration(userSettings.PayloadReadTimeout)
if timeout == 0 {
timeout = time.Minute * 2
}
timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
ray := v.packetDispatcher.DispatchToOutbound(ctx)
input := ray.InboundInput()
output := ray.InboundOutput()
userSettings := request.User.GetSettings()
connReader.SetTimeOut(userSettings.PayloadReadTimeout)
reader.SetBuffered(false)
requestDone := signal.ExecuteAsync(func() error {