1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-02 07:26:24 -05:00

more accurate memory usage when buffer = 0

This commit is contained in:
Darien Raymond 2018-07-31 16:05:57 +02:00
parent 73a1111083
commit 5e65d7da6d
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
10 changed files with 80 additions and 22 deletions

View File

@ -92,9 +92,14 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
requestDone := func() error { requestDone := func() error {
defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly) defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly)
chunkReader := buf.NewReader(conn) var reader buf.Reader
if plcy.Buffer.PerConnection == 0 {
reader = &buf.SingleReader{Reader: conn}
} else {
reader = buf.NewReader(conn)
}
if err := buf.Copy(chunkReader, link.Writer, buf.UpdateActivity(timer)); err != nil { if err := buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)); err != nil {
return newError("failed to transport request").Base(err) return newError("failed to transport request").Base(err)
} }

View File

@ -118,11 +118,12 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
} }
defer conn.Close() // nolint: errcheck defer conn.Close() // nolint: errcheck
plcy := h.policy()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, h.policy().Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, plcy.Timeouts.ConnectionIdle)
requestDone := func() error { requestDone := func() error {
defer timer.SetTimeout(h.policy().Timeouts.DownlinkOnly) defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly)
var writer buf.Writer var writer buf.Writer
if destination.Network == net.Network_TCP { if destination.Network == net.Network_TCP {
@ -138,10 +139,15 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
} }
responseDone := func() error { responseDone := func() error {
defer timer.SetTimeout(h.policy().Timeouts.UplinkOnly) defer timer.SetTimeout(plcy.Timeouts.UplinkOnly)
v2reader := buf.NewReader(conn) var reader buf.Reader
if err := buf.Copy(v2reader, output, buf.UpdateActivity(timer)); err != nil { if plcy.Buffer.PerConnection == 0 {
reader = &buf.SingleReader{Reader: conn}
} else {
reader = buf.NewReader(conn)
}
if err := buf.Copy(reader, output, buf.UpdateActivity(timer)); err != nil {
return newError("failed to process response").Base(err) return newError("failed to process response").Base(err)
} }

View File

@ -194,14 +194,19 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
} }
requestDone := func() error { requestDone := func() error {
defer timer.SetTimeout(s.policy().Timeouts.DownlinkOnly) defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly)
v2reader := buf.NewReader(conn) var reader buf.Reader
return buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer)) if plcy.Buffer.PerConnection == 0 {
reader = &buf.SingleReader{Reader: conn}
} else {
reader = buf.NewReader(conn)
}
return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer))
} }
responseDone := func() error { responseDone := func() error {
defer timer.SetTimeout(s.policy().Timeouts.UplinkOnly) defer timer.SetTimeout(plcy.Timeouts.UplinkOnly)
v2writer := buf.NewWriter(conn) v2writer := buf.NewWriter(conn)
if err := buf.Copy(link.Reader, v2writer, buf.UpdateActivity(timer)); err != nil { if err := buf.Copy(link.Reader, v2writer, buf.UpdateActivity(timer)); err != nil {

View File

@ -149,7 +149,17 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error {
sessionPolicy := s.v.PolicyManager().ForLevel(s.user.Level) sessionPolicy := s.v.PolicyManager().ForLevel(s.user.Level)
conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)) conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake))
bufferedReader := buf.BufferedReader{Reader: buf.NewReader(conn)}
var bufferedReader buf.BufferedReader
{
var reader buf.Reader
if sessionPolicy.Buffer.PerConnection == 0 {
reader = &buf.SingleReader{Reader: conn}
} else {
reader = buf.NewReader(conn)
}
bufferedReader = buf.BufferedReader{Reader: reader}
}
request, bodyReader, err := ReadTCPSession(s.user, &bufferedReader) request, bodyReader, err := ReadTCPSession(s.user, &bufferedReader)
if err != nil { if err != nil {
log.Record(&log.AccessMessage{ log.Record(&log.AccessMessage{

View File

@ -113,7 +113,13 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
} }
responseFunc = func() error { responseFunc = func() error {
defer timer.SetTimeout(p.Timeouts.UplinkOnly) defer timer.SetTimeout(p.Timeouts.UplinkOnly)
return buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer)) var reader buf.Reader
if p.Buffer.PerConnection == 0 {
reader = &buf.SingleReader{Reader: conn}
} else {
reader = buf.NewReader(conn)
}
return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer))
} }
} else if request.Command == protocol.RequestCommandUDP { } else if request.Command == protocol.RequestCommandUDP {
udpConn, err := dialer.Dial(ctx, udpRequest.Destination()) udpConn, err := dialer.Dial(ctx, udpRequest.Destination())

View File

@ -68,11 +68,21 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
} }
func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error {
if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil { plcy := s.policy()
if err := conn.SetReadDeadline(time.Now().Add(plcy.Timeouts.Handshake)); err != nil {
newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
reader := &buf.BufferedReader{Reader: buf.NewReader(conn)} var reader *buf.BufferedReader
{
var r buf.Reader
if plcy.Buffer.PerConnection == 0 {
r = &buf.SingleReader{Reader: conn}
} else {
r = buf.NewReader(conn)
}
reader = &buf.BufferedReader{Reader: r}
}
inboundDest, ok := proxy.InboundEntryPointFromContext(ctx) inboundDest, ok := proxy.InboundEntryPointFromContext(ctx)
if !ok { if !ok {
@ -141,9 +151,7 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
requestDone := func() error { requestDone := func() error {
defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly) defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly)
if err := buf.Copy(buf.NewReader(reader), link.Writer, buf.UpdateActivity(timer)); err != nil {
v2reader := buf.NewReader(reader)
if err := buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer)); err != nil {
return newError("failed to transport all TCP request").Base(err) return newError("failed to transport all TCP request").Base(err)
} }

View File

@ -140,7 +140,7 @@ func (c *ClientSession) EncodeRequestBody(request *protocol.RequestHeader, write
return crypto.NewAuthenticationWriter(auth, sizeParser, cryptionWriter, request.Command.TransferType(), padding) return crypto.NewAuthenticationWriter(auth, sizeParser, cryptionWriter, request.Command.TransferType(), padding)
} }
return buf.NewWriter(cryptionWriter) return &buf.SequentialWriter{Writer: cryptionWriter}
case protocol.SecurityType_AES128_GCM: case protocol.SecurityType_AES128_GCM:
block, _ := aes.NewCipher(c.requestBodyKey[:]) block, _ := aes.NewCipher(c.requestBodyKey[:])
aead, _ := cipher.NewGCM(block) aead, _ := cipher.NewGCM(block)

View File

@ -332,7 +332,7 @@ func (s *ServerSession) EncodeResponseBody(request *protocol.RequestHeader, writ
return crypto.NewAuthenticationWriter(auth, sizeParser, s.responseWriter, request.Command.TransferType(), padding) return crypto.NewAuthenticationWriter(auth, sizeParser, s.responseWriter, request.Command.TransferType(), padding)
} }
return buf.NewWriter(s.responseWriter) return &buf.SequentialWriter{Writer: s.responseWriter}
case protocol.SecurityType_AES128_GCM: case protocol.SecurityType_AES128_GCM:
block, _ := aes.NewCipher(s.responseBodyKey[:]) block, _ := aes.NewCipher(s.responseBodyKey[:])
aead, _ := cipher.NewGCM(block) aead, _ := cipher.NewGCM(block)

View File

@ -224,7 +224,16 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
return newError("unable to set read deadline").Base(err).AtWarning() return newError("unable to set read deadline").Base(err).AtWarning()
} }
reader := &buf.BufferedReader{Reader: buf.NewReader(connection)} var reader *buf.BufferedReader
{
var r buf.Reader
if sessionPolicy.Buffer.PerConnection == 0 {
r = &buf.SingleReader{Reader: connection}
} else {
r = buf.NewReader(connection)
}
reader = &buf.BufferedReader{Reader: r}
}
svrSession := encoding.NewServerSession(h.clients, h.sessionHistory) svrSession := encoding.NewServerSession(h.clients, h.sessionHistory)
request, err := svrSession.DecodeRequestHeader(reader) request, err := svrSession.DecodeRequestHeader(reader)

View File

@ -144,7 +144,16 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
responseDone := func() error { responseDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
reader := &buf.BufferedReader{Reader: buf.NewReader(conn)} var reader *buf.BufferedReader
{
var r buf.Reader
if sessionPolicy.Buffer.PerConnection == 0 {
r = &buf.SingleReader{Reader: conn}
} else {
r = buf.NewReader(conn)
}
reader = &buf.BufferedReader{Reader: r}
}
header, err := session.DecodeResponseHeader(reader) header, err := session.DecodeResponseHeader(reader)
if err != nil { if err != nil {
return newError("failed to read header").Base(err) return newError("failed to read header").Base(err)