From d7def8bf4781228d6644d6e29a052bff0b033d3e Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Wed, 4 Jul 2018 18:58:19 +0200 Subject: [PATCH] apply policy in mtproto inbound --- proxy/mtproto/server.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/proxy/mtproto/server.go b/proxy/mtproto/server.go index 48ef8f736..eba1c684f 100644 --- a/proxy/mtproto/server.go +++ b/proxy/mtproto/server.go @@ -2,6 +2,7 @@ package mtproto import ( "context" + "time" "v2ray.com/core" "v2ray.com/core/common" @@ -10,6 +11,8 @@ import ( "v2ray.com/core/common/net" "v2ray.com/core/common/predicate" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/session" + "v2ray.com/core/common/signal" "v2ray.com/core/common/task" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/pipe" @@ -28,6 +31,7 @@ var ( type Server struct { user *protocol.User account *Account + policy core.PolicyManager } func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) { @@ -45,9 +49,12 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) { return nil, newError("not a MTProto account") } + v := core.MustFromContext(ctx) + return &Server{ user: user, account: account, + policy: v.PolicyManager(), }, nil } @@ -58,12 +65,21 @@ func (s *Server) Network() net.NetworkList { } func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { + sPolicy := s.policy.ForLevel(s.user.Level) + + if err := conn.SetDeadline(time.Now().Add(sPolicy.Timeouts.Handshake)); err != nil { + newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) + } auth, err := ReadAuthentication(conn) if err != nil { return newError("failed to read authentication header").Base(err) } defer putAuthenticationObject(auth) + if err := conn.SetDeadline(time.Time{}); err != nil { + newError("failed to clear deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) + } + auth.ApplySecret(s.account.Secret) decryptor := crypto.NewAesCTRStream(auth.DecodingKey[:], auth.DecodingNonce[:]) @@ -83,17 +99,26 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet Address: dcList[dcID], Port: net.Port(443), } + + ctx, cancel := context.WithCancel(ctx) + timer := signal.CancelAfterInactivity(ctx, cancel, sPolicy.Timeouts.ConnectionIdle) + ctx = core.ContextWithBufferPolicy(ctx, sPolicy.Buffer) + link, err := dispatcher.Dispatch(ctx, dest) if err != nil { return newError("failed to dispatch request to: ", dest).Base(err) } request := func() error { + defer timer.SetTimeout(sPolicy.Timeouts.DownlinkOnly) + reader := buf.NewReader(crypto.NewCryptionReader(decryptor, conn)) return buf.Copy(reader, link.Writer) } response := func() error { + defer timer.SetTimeout(sPolicy.Timeouts.UplinkOnly) + encryptor := crypto.NewAesCTRStream(auth.EncodingKey[:], auth.EncodingNonce[:]) writer := buf.NewWriter(crypto.NewCryptionWriter(encryptor, conn)) return buf.Copy(link.Reader, writer)