From 9977eadf8c616b34c86cdb16ef58bbc2dbe2b406 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Mon, 25 Jun 2018 01:09:02 +0200 Subject: [PATCH] remove dependency from errors to session --- app/dispatcher/default.go | 9 +-- app/proxyman/inbound/inbound.go | 3 +- app/proxyman/inbound/worker.go | 6 +- app/proxyman/mux/mux.go | 13 +++-- app/proxyman/outbound/handler.go | 9 +-- common/errors/errors.go | 77 +++++++++----------------- common/session/request.go | 6 ++ common/session/session.go | 9 +++ proxy/dokodemo/dokodemo.go | 3 +- proxy/freedom/freedom.go | 7 ++- proxy/http/server.go | 9 +-- proxy/shadowsocks/client.go | 3 +- proxy/shadowsocks/server.go | 13 +++-- proxy/socks/client.go | 7 ++- proxy/socks/server.go | 24 ++++---- proxy/vmess/inbound/inbound.go | 17 +++--- proxy/vmess/outbound/outbound.go | 3 +- transport/internet/tcp/dialer.go | 3 +- transport/internet/tcp/hub.go | 3 +- transport/internet/udp/dispatcher.go | 7 ++- transport/internet/websocket/dialer.go | 3 +- 21 files changed, 119 insertions(+), 115 deletions(-) create mode 100644 common/session/request.go diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index b66eef49e..ca2e6f0ae 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -12,6 +12,7 @@ import ( "v2ray.com/core/common/buf" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/session" "v2ray.com/core/common/stats" "v2ray.com/core/proxy" "v2ray.com/core/transport/pipe" @@ -143,7 +144,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin outbound.Reader = cReader domain, err := sniffer(ctx, snifferList, cReader) if err == nil { - newError("sniffed domain: ", domain).WithContext(ctx).WriteToLog() + newError("sniffed domain: ", domain).WriteToLog(session.ExportIDToError(ctx)) destination.Address = net.ParseAddress(domain) ctx = proxy.ContextWithTarget(ctx, destination) } @@ -189,13 +190,13 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *core.Link, if d.router != nil { if tag, err := d.router.PickRoute(ctx); err == nil { if handler := d.ohm.GetHandler(tag); handler != nil { - newError("taking detour [", tag, "] for [", destination, "]").WithContext(ctx).WriteToLog() + newError("taking detour [", tag, "] for [", destination, "]").WriteToLog(session.ExportIDToError(ctx)) dispatcher = handler } else { - newError("non existing tag: ", tag).AtWarning().WithContext(ctx).WriteToLog() + newError("non existing tag: ", tag).AtWarning().WriteToLog(session.ExportIDToError(ctx)) } } else { - newError("default route for ", destination).WithContext(ctx).WriteToLog() + newError("default route for ", destination).WriteToLog(session.ExportIDToError(ctx)) } } dispatcher.Dispatch(ctx, link) diff --git a/app/proxyman/inbound/inbound.go b/app/proxyman/inbound/inbound.go index f251d2e9c..888564ece 100644 --- a/app/proxyman/inbound/inbound.go +++ b/app/proxyman/inbound/inbound.go @@ -10,6 +10,7 @@ import ( "v2ray.com/core/app/proxyman" "v2ray.com/core/common" "v2ray.com/core/common/serial" + "v2ray.com/core/common/session" ) // Manager is to manage all inbound handlers. @@ -74,7 +75,7 @@ func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { if handler, found := m.taggedHandlers[tag]; found { if err := handler.Close(); err != nil { - newError("failed to close handler ", tag).Base(err).AtWarning().WithContext(ctx).WriteToLog() + newError("failed to close handler ", tag).Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx)) } delete(m.taggedHandlers, tag) return nil diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 2895f6269..2a97d7736 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -52,7 +52,7 @@ func (w *tcpWorker) callback(conn internet.Connection) { if w.recvOrigDest { dest, err := tcp.GetOriginalDestination(conn) if err != nil { - newError("failed to get original destination").WithContext(ctx).Base(err).WriteToLog() + newError("failed to get original destination").Base(err).WriteToLog(session.ExportIDToError(ctx)) } if dest.IsValid() { ctx = proxy.ContextWithOriginalTarget(ctx, dest) @@ -74,11 +74,11 @@ func (w *tcpWorker) callback(conn internet.Connection) { } } if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil { - newError("connection ends").Base(err).WithContext(ctx).WriteToLog() + newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx)) } cancel() if err := conn.Close(); err != nil { - newError("failed to close connection").Base(err).WithContext(ctx).WriteToLog() + newError("failed to close connection").Base(err).WriteToLog(session.ExportIDToError(ctx)) } } diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index b1e989465..02c4b5c4d 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -16,6 +16,7 @@ import ( "v2ray.com/core/common/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/session" "v2ray.com/core/common/signal/done" "v2ray.com/core/proxy" "v2ray.com/core/transport/pipe" @@ -170,10 +171,10 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { defer s.Close() // nolint: errcheck defer writer.Close() // nolint: errcheck - newError("dispatching request to ", dest).WithContext(ctx).WriteToLog() + newError("dispatching request to ", dest).WriteToLog(session.ExportIDToError(ctx)) if pReader, ok := s.input.(*pipe.Reader); ok { if err := copyFirstPayload(pReader, writer); err != nil { - newError("failed to fetch first payload").Base(err).WithContext(ctx).WriteToLog() + newError("failed to fetch first payload").Base(err).WriteToLog(session.ExportIDToError(ctx)) writer.hasError = true pipe.CloseError(s.input) return @@ -181,7 +182,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { } if err := buf.Copy(s.input, writer); err != nil { - newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog() + newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx)) writer.hasError = true pipe.CloseError(s.input) return @@ -346,7 +347,7 @@ type ServerWorker struct { func handle(ctx context.Context, s *Session, output buf.Writer) { writer := NewResponseWriter(s.ID, output, s.transferType) if err := buf.Copy(s.input, writer); err != nil { - newError("session ", s.ID, " ends.").Base(err).WithContext(ctx).WriteToLog() + newError("session ", s.ID, " ends.").Base(err).WriteToLog(session.ExportIDToError(ctx)) writer.hasError = true } @@ -362,7 +363,7 @@ func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.Bu } func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error { - newError("received request for ", meta.Target).WithContext(ctx).WriteToLog() + newError("received request for ", meta.Target).WriteToLog(session.ExportIDToError(ctx)) { msg := &log.AccessMessage{ To: meta.Target, @@ -475,7 +476,7 @@ func (w *ServerWorker) run(ctx context.Context) { err := w.handleFrame(ctx, reader) if err != nil { if errors.Cause(err) != io.EOF { - newError("unexpected EOF").Base(err).WithContext(ctx).WriteToLog() + newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx)) pipe.CloseError(input) } return diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index 1a338559e..bb3ccfe5c 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -8,6 +8,7 @@ import ( "v2ray.com/core/app/proxyman/mux" "v2ray.com/core/common" "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/pipe" @@ -77,13 +78,13 @@ func (h *Handler) Tag() string { func (h *Handler) Dispatch(ctx context.Context, link *core.Link) { if h.mux != nil { if err := h.mux.Dispatch(ctx, link); err != nil { - newError("failed to process mux outbound traffic").Base(err).WithContext(ctx).WriteToLog() + newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx)) pipe.CloseError(link.Writer) } } else { if err := h.proxy.Process(ctx, link, h); err != nil { // Ensure outbound ray is properly closed. - newError("failed to process outbound traffic").Base(err).WithContext(ctx).WriteToLog() + newError("failed to process outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx)) pipe.CloseError(link.Writer) } else { common.Must(common.Close(link.Writer)) @@ -99,7 +100,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn tag := h.senderSettings.ProxySettings.Tag handler := h.outboundManager.GetHandler(tag) if handler != nil { - newError("proxying to ", tag, " for dest ", dest).AtDebug().WithContext(ctx).WriteToLog() + newError("proxying to ", tag, " for dest ", dest).AtDebug().WriteToLog(session.ExportIDToError(ctx)) ctx = proxy.ContextWithTarget(ctx, dest) opts := pipe.OptionsFromContext(ctx) @@ -110,7 +111,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil } - newError("failed to get outbound handler with tag: ", tag).AtWarning().WithContext(ctx).WriteToLog() + newError("failed to get outbound handler with tag: ", tag).AtWarning().WriteToLog(session.ExportIDToError(ctx)) } if h.senderSettings.Via != nil { diff --git a/common/errors/errors.go b/common/errors/errors.go index b9b354157..8a487c288 100644 --- a/common/errors/errors.go +++ b/common/errors/errors.go @@ -2,13 +2,11 @@ package errors // import "v2ray.com/core/common/errors" import ( - "context" "os" "strings" "v2ray.com/core/common/log" "v2ray.com/core/common/serial" - "v2ray.com/core/common/session" ) type hasInnerError interface { @@ -20,17 +18,13 @@ type hasSeverity interface { Severity() log.Severity } -type hasContext interface { - Context() context.Context -} - // Error is an error object with underlying error. type Error struct { + prefix []interface{} + path []string message []interface{} inner error severity log.Severity - path []string - ctx context.Context } // Error implements error.Error(). @@ -42,7 +36,13 @@ func (v *Error) Error() string { if len(v.path) > 0 { msg = strings.Join(v.path, "|") + ": " + msg } - return msg + + var prefix string + for _, p := range v.prefix { + prefix += "[" + serial.ToString(p) + "] " + } + + return prefix + msg } // Inner implements hasInnerError.Inner() @@ -58,28 +58,6 @@ func (v *Error) Base(err error) *Error { return v } -func (v *Error) WithContext(ctx context.Context) *Error { - v.ctx = ctx - return v -} - -// Context returns the context that associated with the Error. -func (v *Error) Context() context.Context { - if v.ctx != nil { - return v.ctx - } - - if v.inner == nil { - return nil - } - - if c, ok := v.inner.(hasContext); ok { - return c.Context() - } - - return nil -} - func (v *Error) atSeverity(s log.Severity) *Error { v.severity = s return v @@ -132,25 +110,29 @@ func (v *Error) String() string { } // WriteToLog writes current error into log. -func (v *Error) WriteToLog() { - ctx := v.Context() - var sid session.ID - if ctx != nil { - sid = session.IDFromContext(ctx) +func (v *Error) WriteToLog(opts ...ExportOption) { + var holder ExportOptionHolder + + for _, opt := range opts { + opt(&holder) } - var c interface{} = v - if sid > 0 { - c = sessionLog{ - id: sid, - content: v, - } + + if holder.SessionID > 0 { + v.prefix = append(v.prefix, holder.SessionID) } + log.Record(&log.GeneralMessage{ Severity: GetSeverity(v), - Content: c, + Content: v, }) } +type ExportOptionHolder struct { + SessionID uint32 +} + +type ExportOption func(*ExportOptionHolder) + // New returns a new error object with message formed from given arguments. func New(msg ...interface{}) *Error { return &Error{ @@ -196,12 +178,3 @@ func GetSeverity(err error) log.Severity { } return log.Severity_Info } - -type sessionLog struct { - id session.ID - content interface{} -} - -func (s sessionLog) String() string { - return serial.Concat("[", s.id, "] ", s.content) -} diff --git a/common/session/request.go b/common/session/request.go new file mode 100644 index 000000000..2b54047aa --- /dev/null +++ b/common/session/request.go @@ -0,0 +1,6 @@ +package session + +type Request struct { + //Destination net.Destination + DecodedLayers []interface{} +} diff --git a/common/session/session.go b/common/session/session.go index e1be95766..b7b9042e1 100644 --- a/common/session/session.go +++ b/common/session/session.go @@ -4,6 +4,8 @@ package session // import "v2ray.com/core/common/session" import ( "context" "math/rand" + + "v2ray.com/core/common/errors" ) // ID of a session. @@ -38,3 +40,10 @@ func IDFromContext(ctx context.Context) ID { } return 0 } + +func ExportIDToError(ctx context.Context) errors.ExportOption { + id := IDFromContext(ctx) + return func(h *errors.ExportOptionHolder) { + h.SessionID = uint32(id) + } +} diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 40bf3289d..e4f7154ba 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -10,6 +10,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" "v2ray.com/core/proxy" @@ -54,7 +55,7 @@ func (d *DokodemoDoor) policy() core.Policy { } func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { - newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WithContext(ctx).WriteToLog() + newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WriteToLog(session.ExportIDToError(ctx)) dest := net.Destination{ Network: network, Address: d.address, diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 91a68e071..c5ce3eda1 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -12,6 +12,7 @@ import ( "v2ray.com/core/common/dice" "v2ray.com/core/common/net" "v2ray.com/core/common/retry" + "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" "v2ray.com/core/proxy" @@ -56,7 +57,7 @@ func (h *Handler) resolveIP(ctx context.Context, domain string) net.Address { ips, err := h.dns.LookupIP(domain) if err != nil { - newError("failed to get IP address for domain ", domain).Base(err).WithContext(ctx).WriteToLog() + newError("failed to get IP address for domain ", domain).Base(err).WriteToLog(session.ExportIDToError(ctx)) } if len(ips) == 0 { return nil @@ -75,7 +76,7 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia Port: net.Port(server.Port), } } - newError("opening connection to ", destination).WithContext(ctx).WriteToLog() + newError("opening connection to ", destination).WriteToLog(session.ExportIDToError(ctx)) input := link.Reader output := link.Writer @@ -88,7 +89,7 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia Address: ip, Port: destination.Port, } - newError("changing destination to ", destination).WithContext(ctx).WriteToLog() + newError("changing destination to ", destination).WriteToLog(session.ExportIDToError(ctx)) } } diff --git a/proxy/http/server.go b/proxy/http/server.go index f09ae875f..de96e9298 100755 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -17,6 +17,7 @@ import ( "v2ray.com/core/common/log" "v2ray.com/core/common/net" http_proto "v2ray.com/core/common/protocol/http" + "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" "v2ray.com/core/transport/internet" @@ -105,7 +106,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet Start: if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil { - newError("failed to set read deadline").Base(err).WithContext(ctx).WriteToLog() + newError("failed to set read deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) } request, err := http.ReadRequest(reader) @@ -124,9 +125,9 @@ Start: } } - newError("request to Method [", request.Method, "] Host [", request.Host, "] with URL [", request.URL, "]").WithContext(ctx).WriteToLog() + newError("request to Method [", request.Method, "] Host [", request.Host, "] with URL [", request.URL, "]").WriteToLog(session.ExportIDToError(ctx)) if err := conn.SetReadDeadline(time.Time{}); err != nil { - newError("failed to clear read deadline").Base(err).WithContext(ctx).WriteToLog() + newError("failed to clear read deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) } defaultPort := net.Port(80) @@ -285,7 +286,7 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri result = nil } } else { - newError("failed to read response from ", request.Host).Base(err).AtWarning().WithContext(ctx).WriteToLog() + newError("failed to read response from ", request.Host).Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx)) response = &http.Response{ Status: "Service Unavailable", StatusCode: 503, diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 3e8309448..8b1478a28 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -3,6 +3,7 @@ package shadowsocks import ( "context" + "v2ray.com/core/common/session" "v2ray.com/core/common/task" "v2ray.com/core" @@ -64,7 +65,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial if err != nil { return newError("failed to find an available destination").AtWarning().Base(err) } - newError("tunneling request to ", destination, " via ", server.Destination()).WithContext(ctx).WriteToLog() + newError("tunneling request to ", destination, " via ", server.Destination()).WriteToLog(session.ExportIDToError(ctx)) defer conn.Close() diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index a8558fb0d..8c5b8961d 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -4,6 +4,7 @@ import ( "context" "time" + "v2ray.com/core/common/session" "v2ray.com/core/common/task" "v2ray.com/core" @@ -86,7 +87,7 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection request, data, err := DecodeUDPPacket(s.user, payload) if err != nil { if source, ok := proxy.SourceFromContext(ctx); ok { - newError("dropping invalid UDP packet from: ", source).Base(err).WithContext(ctx).WriteToLog() + newError("dropping invalid UDP packet from: ", source).Base(err).WriteToLog(session.ExportIDToError(ctx)) log.Record(&log.AccessMessage{ From: source, To: "", @@ -99,13 +100,13 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection } if request.Option.Has(RequestOptionOneTimeAuth) && s.account.OneTimeAuth == Account_Disabled { - newError("client payload enables OTA but server doesn't allow it").WithContext(ctx).WriteToLog() + newError("client payload enables OTA but server doesn't allow it").WriteToLog(session.ExportIDToError(ctx)) payload.Release() continue } if !request.Option.Has(RequestOptionOneTimeAuth) && s.account.OneTimeAuth == Account_Enabled { - newError("client payload disables OTA but server forces it").WithContext(ctx).WriteToLog() + newError("client payload disables OTA but server forces it").WriteToLog(session.ExportIDToError(ctx)) payload.Release() continue } @@ -119,14 +120,14 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection Reason: "", }) } - newError("tunnelling request to ", dest).WithContext(ctx).WriteToLog() + newError("tunnelling request to ", dest).WriteToLog(session.ExportIDToError(ctx)) ctx = protocol.ContextWithUser(ctx, request.User) udpServer.Dispatch(ctx, dest, data, func(payload *buf.Buffer) { data, err := EncodeUDPPacket(request, payload.Bytes()) payload.Release() if err != nil { - newError("failed to encode UDP packet").Base(err).AtWarning().WithContext(ctx).WriteToLog() + newError("failed to encode UDP packet").Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx)) return } defer data.Release() @@ -164,7 +165,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, Status: log.AccessAccepted, Reason: "", }) - newError("tunnelling request to ", dest).WithContext(ctx).WriteToLog() + newError("tunnelling request to ", dest).WriteToLog(session.ExportIDToError(ctx)) ctx = protocol.ContextWithUser(ctx, request.User) diff --git a/proxy/socks/client.go b/proxy/socks/client.go index ede7e6b49..8765b94e7 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -4,6 +4,7 @@ import ( "context" "time" + "v2ray.com/core/common/session" "v2ray.com/core/common/task" "v2ray.com/core" @@ -66,7 +67,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial defer func() { if err := conn.Close(); err != nil { - newError("failed to closed connection").Base(err).WithContext(ctx).WriteToLog() + newError("failed to closed connection").Base(err).WriteToLog(session.ExportIDToError(ctx)) } }() @@ -89,7 +90,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial } if err := conn.SetDeadline(time.Now().Add(p.Timeouts.Handshake)); err != nil { - newError("failed to set deadline for handshake").Base(err).WithContext(ctx).WriteToLog() + newError("failed to set deadline for handshake").Base(err).WriteToLog(session.ExportIDToError(ctx)) } udpRequest, err := ClientHandshake(request, conn, conn) if err != nil { @@ -97,7 +98,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial } if err := conn.SetDeadline(time.Time{}); err != nil { - newError("failed to clear deadline after handshake").Base(err).WithContext(ctx).WriteToLog() + newError("failed to clear deadline after handshake").Base(err).WriteToLog(session.ExportIDToError(ctx)) } ctx, cancel := context.WithCancel(ctx) diff --git a/proxy/socks/server.go b/proxy/socks/server.go index bd3b165ca..cb46d230e 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -5,15 +5,15 @@ import ( "io" "time" - "v2ray.com/core/common/task" - "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/session" "v2ray.com/core/common/signal" + "v2ray.com/core/common/task" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/udp" @@ -69,7 +69,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil { - newError("failed to set deadline").Base(err).WithContext(ctx).WriteToLog() + newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) } reader := &buf.BufferedReader{Reader: buf.NewReader(conn)} @@ -78,12 +78,12 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispa if !ok { return newError("inbound entry point not specified") } - session := &ServerSession{ + svrSession := &ServerSession{ config: s.config, port: inboundDest.Port, } - request, err := session.Handshake(reader, conn) + request, err := svrSession.Handshake(reader, conn) if err != nil { if source, ok := proxy.SourceFromContext(ctx); ok { log.Record(&log.AccessMessage{ @@ -97,12 +97,12 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispa } if err := conn.SetReadDeadline(time.Time{}); err != nil { - newError("failed to clear deadline").Base(err).WithContext(ctx).WriteToLog() + newError("failed to clear deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) } if request.Command == protocol.RequestCommandTCP { dest := request.Destination() - newError("TCP Connect request to ", dest).WithContext(ctx).WriteToLog() + newError("TCP Connect request to ", dest).WriteToLog(session.ExportIDToError(ctx)) if source, ok := proxy.SourceFromContext(ctx); ok { log.Record(&log.AccessMessage{ From: source, @@ -175,7 +175,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, udpServer := udp.NewDispatcher(dispatcher) if source, ok := proxy.SourceFromContext(ctx); ok { - newError("client UDP connection from ", source).WithContext(ctx).WriteToLog() + newError("client UDP connection from ", source).WriteToLog(session.ExportIDToError(ctx)) } reader := buf.NewReader(conn) @@ -189,7 +189,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, request, err := DecodeUDPPacket(payload) if err != nil { - newError("failed to parse UDP request").Base(err).WithContext(ctx).WriteToLog() + newError("failed to parse UDP request").Base(err).WriteToLog(session.ExportIDToError(ctx)) payload.Release() continue } @@ -199,7 +199,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, continue } - newError("send packet to ", request.Destination(), " with ", payload.Len(), " bytes").AtDebug().WithContext(ctx).WriteToLog() + newError("send packet to ", request.Destination(), " with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx)) if source, ok := proxy.SourceFromContext(ctx); ok { log.Record(&log.AccessMessage{ From: source, @@ -210,14 +210,14 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, } udpServer.Dispatch(ctx, request.Destination(), payload, func(payload *buf.Buffer) { - newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WithContext(ctx).WriteToLog() + newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx)) udpMessage, err := EncodeUDPPacket(request, payload.Bytes()) payload.Release() defer udpMessage.Release() if err != nil { - newError("failed to write UDP response").AtWarning().Base(err).WithContext(ctx).WriteToLog() + newError("failed to write UDP response").AtWarning().Base(err).WriteToLog(session.ExportIDToError(ctx)) } conn.Write(udpMessage.Bytes()) // nolint: errcheck diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index f2890c99b..73be09876 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "v2ray.com/core/common/session" "v2ray.com/core/common/task" "v2ray.com/core" @@ -226,8 +227,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i reader := &buf.BufferedReader{Reader: buf.NewReader(connection)} - session := encoding.NewServerSession(h.clients, h.sessionHistory) - request, err := session.DecodeRequestHeader(reader) + svrSession := encoding.NewServerSession(h.clients, h.sessionHistory) + request, err := svrSession.DecodeRequestHeader(reader) if err != nil { if errors.Cause(err) != io.EOF { @@ -261,10 +262,10 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i }) } - newError("received request for ", request.Destination()).WithContext(ctx).WriteToLog() + newError("received request for ", request.Destination()).WriteToLog(session.ExportIDToError(ctx)) if err := connection.SetReadDeadline(time.Time{}); err != nil { - newError("unable to set back read deadline").Base(err).WithContext(ctx).WriteToLog() + newError("unable to set back read deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) } sessionPolicy = h.policyManager.ForLevel(request.User.Level) @@ -281,7 +282,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i requestDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) - return transferRequest(timer, session, request, reader, link.Writer) + return transferRequest(timer, svrSession, request, reader, link.Writer) } responseDone := func() error { @@ -292,7 +293,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i response := &protocol.ResponseHeader{ Command: h.generateCommand(ctx, request), } - return transferResponse(timer, session, request, response, link.Reader, writer) + return transferResponse(timer, svrSession, request, response, link.Reader, writer) } var requestDonePost = task.Single(requestDone, task.OnSuccess(task.Close(link.Writer))) @@ -311,7 +312,7 @@ func (h *Handler) generateCommand(ctx context.Context, request *protocol.Request if h.inboundHandlerManager != nil { handler, err := h.inboundHandlerManager.GetHandler(ctx, tag) if err != nil { - newError("failed to get detour handler: ", tag).Base(err).AtWarning().WithContext(ctx).WriteToLog() + newError("failed to get detour handler: ", tag).Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx)) return nil } proxyHandler, port, availableMin := handler.GetRandomInboundProxy() @@ -321,7 +322,7 @@ func (h *Handler) generateCommand(ctx context.Context, request *protocol.Request availableMin = 255 } - newError("pick detour handler for port ", port, " for ", availableMin, " minutes.").AtDebug().WithContext(ctx).WriteToLog() + newError("pick detour handler for port ", port, " for ", availableMin, " minutes.").AtDebug().WriteToLog(session.ExportIDToError(ctx)) user := inboundHandler.GetUser(request.User.Email) if user == nil { return nil diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index cc4a32f90..5b7e7d4a6 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -6,6 +6,7 @@ import ( "context" "time" + "v2ray.com/core/common/session" "v2ray.com/core/common/task" "v2ray.com/core/transport/pipe" @@ -69,7 +70,7 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia if !ok { return newError("target not specified").AtError() } - newError("tunneling request to ", target, " via ", rec.Destination()).WithContext(ctx).WriteToLog() + newError("tunneling request to ", target, " via ", rec.Destination()).WriteToLog(session.ExportIDToError(ctx)) command := protocol.RequestCommandTCP if target.Network == net.Network_UDP { diff --git a/transport/internet/tcp/dialer.go b/transport/internet/tcp/dialer.go index d90ed46e2..d4f1a9fcc 100644 --- a/transport/internet/tcp/dialer.go +++ b/transport/internet/tcp/dialer.go @@ -5,6 +5,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/tls" ) @@ -19,7 +20,7 @@ func getTCPSettingsFromContext(ctx context.Context) *Config { // Dial dials a new TCP connection to the given destination. func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) { - newError("dialing TCP to ", dest).WithContext(ctx).WriteToLog() + newError("dialing TCP to ", dest).WriteToLog(session.ExportIDToError(ctx)) src := internet.DialerSourceFromContext(ctx) conn, err := internet.DialSystem(ctx, src, dest) diff --git a/transport/internet/tcp/hub.go b/transport/internet/tcp/hub.go index 51f3e5a1b..7b5809285 100644 --- a/transport/internet/tcp/hub.go +++ b/transport/internet/tcp/hub.go @@ -7,6 +7,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/tls" ) @@ -29,7 +30,7 @@ func ListenTCP(ctx context.Context, address net.Address, port net.Port, handler if err != nil { return nil, err } - newError("listening TCP on ", address, ":", port).WithContext(ctx).WriteToLog() + newError("listening TCP on ", address, ":", port).WriteToLog(session.ExportIDToError(ctx)) tcpSettings := getTCPSettingsFromContext(ctx) diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index 792e92310..e07bb6b13 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -9,6 +9,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/common/signal" ) @@ -72,13 +73,13 @@ func (v *Dispatcher) getInboundRay(dest net.Destination, callback ResponseCallba func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer, callback ResponseCallback) { // TODO: Add user to destString - newError("dispatch request to: ", destination).AtDebug().WithContext(ctx).WriteToLog() + newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx)) conn := v.getInboundRay(destination, callback) outputStream := conn.link.Writer if outputStream != nil { if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil { - newError("failed to write first UDP payload").Base(err).WithContext(ctx).WriteToLog() + newError("failed to write first UDP payload").Base(err).WriteToLog(session.ExportIDToError(ctx)) conn.cancel() return } @@ -98,7 +99,7 @@ func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback mb, err := input.ReadMultiBuffer() if err != nil { - newError("failed to handle UDP input").Base(err).WithContext(ctx).WriteToLog() + newError("failed to handle UDP input").Base(err).WriteToLog(session.ExportIDToError(ctx)) conn.cancel() return } diff --git a/transport/internet/websocket/dialer.go b/transport/internet/websocket/dialer.go index f0f5ef2aa..873e9bd05 100644 --- a/transport/internet/websocket/dialer.go +++ b/transport/internet/websocket/dialer.go @@ -8,13 +8,14 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/tls" ) // Dial dials a WebSocket connection to the given destination. func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) { - newError("creating connection to ", dest).WithContext(ctx).WriteToLog() + newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx)) conn, err := dialWebsocket(ctx, dest) if err != nil {