1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-02 07:26:24 -05:00

remove dependency from errors to session

This commit is contained in:
Darien Raymond 2018-06-25 01:09:02 +02:00
parent 55897f0b22
commit 9977eadf8c
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
21 changed files with 119 additions and 115 deletions

View File

@ -12,6 +12,7 @@ import (
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/session"
"v2ray.com/core/common/stats" "v2ray.com/core/common/stats"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -143,7 +144,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
outbound.Reader = cReader outbound.Reader = cReader
domain, err := sniffer(ctx, snifferList, cReader) domain, err := sniffer(ctx, snifferList, cReader)
if err == nil { if err == nil {
newError("sniffed domain: ", domain).WithContext(ctx).WriteToLog() newError("sniffed domain: ", domain).WriteToLog(session.ExportIDToError(ctx))
destination.Address = net.ParseAddress(domain) destination.Address = net.ParseAddress(domain)
ctx = proxy.ContextWithTarget(ctx, destination) ctx = proxy.ContextWithTarget(ctx, destination)
} }
@ -189,13 +190,13 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *core.Link,
if d.router != nil { if d.router != nil {
if tag, err := d.router.PickRoute(ctx); err == nil { if tag, err := d.router.PickRoute(ctx); err == nil {
if handler := d.ohm.GetHandler(tag); handler != nil { if handler := d.ohm.GetHandler(tag); handler != nil {
newError("taking detour [", tag, "] for [", destination, "]").WithContext(ctx).WriteToLog() newError("taking detour [", tag, "] for [", destination, "]").WriteToLog(session.ExportIDToError(ctx))
dispatcher = handler dispatcher = handler
} else { } else {
newError("non existing tag: ", tag).AtWarning().WithContext(ctx).WriteToLog() newError("non existing tag: ", tag).AtWarning().WriteToLog(session.ExportIDToError(ctx))
} }
} else { } else {
newError("default route for ", destination).WithContext(ctx).WriteToLog() newError("default route for ", destination).WriteToLog(session.ExportIDToError(ctx))
} }
} }
dispatcher.Dispatch(ctx, link) dispatcher.Dispatch(ctx, link)

View File

@ -10,6 +10,7 @@ import (
"v2ray.com/core/app/proxyman" "v2ray.com/core/app/proxyman"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
"v2ray.com/core/common/session"
) )
// Manager is to manage all inbound handlers. // Manager is to manage all inbound handlers.
@ -74,7 +75,7 @@ func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
if handler, found := m.taggedHandlers[tag]; found { if handler, found := m.taggedHandlers[tag]; found {
if err := handler.Close(); err != nil { if err := handler.Close(); err != nil {
newError("failed to close handler ", tag).Base(err).AtWarning().WithContext(ctx).WriteToLog() newError("failed to close handler ", tag).Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
} }
delete(m.taggedHandlers, tag) delete(m.taggedHandlers, tag)
return nil return nil

View File

@ -52,7 +52,7 @@ func (w *tcpWorker) callback(conn internet.Connection) {
if w.recvOrigDest { if w.recvOrigDest {
dest, err := tcp.GetOriginalDestination(conn) dest, err := tcp.GetOriginalDestination(conn)
if err != nil { if err != nil {
newError("failed to get original destination").WithContext(ctx).Base(err).WriteToLog() newError("failed to get original destination").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
if dest.IsValid() { if dest.IsValid() {
ctx = proxy.ContextWithOriginalTarget(ctx, dest) ctx = proxy.ContextWithOriginalTarget(ctx, dest)
@ -74,11 +74,11 @@ func (w *tcpWorker) callback(conn internet.Connection) {
} }
} }
if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil { if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil {
newError("connection ends").Base(err).WithContext(ctx).WriteToLog() newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
cancel() cancel()
if err := conn.Close(); err != nil { if err := conn.Close(); err != nil {
newError("failed to close connection").Base(err).WithContext(ctx).WriteToLog() newError("failed to close connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
} }

View File

@ -16,6 +16,7 @@ import (
"v2ray.com/core/common/log" "v2ray.com/core/common/log"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -170,10 +171,10 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
defer s.Close() // nolint: errcheck defer s.Close() // nolint: errcheck
defer writer.Close() // nolint: errcheck defer writer.Close() // nolint: errcheck
newError("dispatching request to ", dest).WithContext(ctx).WriteToLog() newError("dispatching request to ", dest).WriteToLog(session.ExportIDToError(ctx))
if pReader, ok := s.input.(*pipe.Reader); ok { if pReader, ok := s.input.(*pipe.Reader); ok {
if err := copyFirstPayload(pReader, writer); err != nil { if err := copyFirstPayload(pReader, writer); err != nil {
newError("failed to fetch first payload").Base(err).WithContext(ctx).WriteToLog() newError("failed to fetch first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
writer.hasError = true writer.hasError = true
pipe.CloseError(s.input) pipe.CloseError(s.input)
return return
@ -181,7 +182,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
} }
if err := buf.Copy(s.input, writer); err != nil { if err := buf.Copy(s.input, writer); err != nil {
newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog() newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx))
writer.hasError = true writer.hasError = true
pipe.CloseError(s.input) pipe.CloseError(s.input)
return return
@ -346,7 +347,7 @@ type ServerWorker struct {
func handle(ctx context.Context, s *Session, output buf.Writer) { func handle(ctx context.Context, s *Session, output buf.Writer) {
writer := NewResponseWriter(s.ID, output, s.transferType) writer := NewResponseWriter(s.ID, output, s.transferType)
if err := buf.Copy(s.input, writer); err != nil { if err := buf.Copy(s.input, writer); err != nil {
newError("session ", s.ID, " ends.").Base(err).WithContext(ctx).WriteToLog() newError("session ", s.ID, " ends.").Base(err).WriteToLog(session.ExportIDToError(ctx))
writer.hasError = true writer.hasError = true
} }
@ -362,7 +363,7 @@ func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.Bu
} }
func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error { func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
newError("received request for ", meta.Target).WithContext(ctx).WriteToLog() newError("received request for ", meta.Target).WriteToLog(session.ExportIDToError(ctx))
{ {
msg := &log.AccessMessage{ msg := &log.AccessMessage{
To: meta.Target, To: meta.Target,
@ -475,7 +476,7 @@ func (w *ServerWorker) run(ctx context.Context) {
err := w.handleFrame(ctx, reader) err := w.handleFrame(ctx, reader)
if err != nil { if err != nil {
if errors.Cause(err) != io.EOF { if errors.Cause(err) != io.EOF {
newError("unexpected EOF").Base(err).WithContext(ctx).WriteToLog() newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx))
pipe.CloseError(input) pipe.CloseError(input)
} }
return return

View File

@ -8,6 +8,7 @@ import (
"v2ray.com/core/app/proxyman/mux" "v2ray.com/core/app/proxyman/mux"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -77,13 +78,13 @@ func (h *Handler) Tag() string {
func (h *Handler) Dispatch(ctx context.Context, link *core.Link) { func (h *Handler) Dispatch(ctx context.Context, link *core.Link) {
if h.mux != nil { if h.mux != nil {
if err := h.mux.Dispatch(ctx, link); err != nil { if err := h.mux.Dispatch(ctx, link); err != nil {
newError("failed to process mux outbound traffic").Base(err).WithContext(ctx).WriteToLog() newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
pipe.CloseError(link.Writer) pipe.CloseError(link.Writer)
} }
} else { } else {
if err := h.proxy.Process(ctx, link, h); err != nil { if err := h.proxy.Process(ctx, link, h); err != nil {
// Ensure outbound ray is properly closed. // Ensure outbound ray is properly closed.
newError("failed to process outbound traffic").Base(err).WithContext(ctx).WriteToLog() newError("failed to process outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
pipe.CloseError(link.Writer) pipe.CloseError(link.Writer)
} else { } else {
common.Must(common.Close(link.Writer)) common.Must(common.Close(link.Writer))
@ -99,7 +100,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
tag := h.senderSettings.ProxySettings.Tag tag := h.senderSettings.ProxySettings.Tag
handler := h.outboundManager.GetHandler(tag) handler := h.outboundManager.GetHandler(tag)
if handler != nil { if handler != nil {
newError("proxying to ", tag, " for dest ", dest).AtDebug().WithContext(ctx).WriteToLog() newError("proxying to ", tag, " for dest ", dest).AtDebug().WriteToLog(session.ExportIDToError(ctx))
ctx = proxy.ContextWithTarget(ctx, dest) ctx = proxy.ContextWithTarget(ctx, dest)
opts := pipe.OptionsFromContext(ctx) opts := pipe.OptionsFromContext(ctx)
@ -110,7 +111,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil
} }
newError("failed to get outbound handler with tag: ", tag).AtWarning().WithContext(ctx).WriteToLog() newError("failed to get outbound handler with tag: ", tag).AtWarning().WriteToLog(session.ExportIDToError(ctx))
} }
if h.senderSettings.Via != nil { if h.senderSettings.Via != nil {

View File

@ -2,13 +2,11 @@
package errors // import "v2ray.com/core/common/errors" package errors // import "v2ray.com/core/common/errors"
import ( import (
"context"
"os" "os"
"strings" "strings"
"v2ray.com/core/common/log" "v2ray.com/core/common/log"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
"v2ray.com/core/common/session"
) )
type hasInnerError interface { type hasInnerError interface {
@ -20,17 +18,13 @@ type hasSeverity interface {
Severity() log.Severity Severity() log.Severity
} }
type hasContext interface {
Context() context.Context
}
// Error is an error object with underlying error. // Error is an error object with underlying error.
type Error struct { type Error struct {
prefix []interface{}
path []string
message []interface{} message []interface{}
inner error inner error
severity log.Severity severity log.Severity
path []string
ctx context.Context
} }
// Error implements error.Error(). // Error implements error.Error().
@ -42,7 +36,13 @@ func (v *Error) Error() string {
if len(v.path) > 0 { if len(v.path) > 0 {
msg = strings.Join(v.path, "|") + ": " + msg msg = strings.Join(v.path, "|") + ": " + msg
} }
return msg
var prefix string
for _, p := range v.prefix {
prefix += "[" + serial.ToString(p) + "] "
}
return prefix + msg
} }
// Inner implements hasInnerError.Inner() // Inner implements hasInnerError.Inner()
@ -58,28 +58,6 @@ func (v *Error) Base(err error) *Error {
return v return v
} }
func (v *Error) WithContext(ctx context.Context) *Error {
v.ctx = ctx
return v
}
// Context returns the context that associated with the Error.
func (v *Error) Context() context.Context {
if v.ctx != nil {
return v.ctx
}
if v.inner == nil {
return nil
}
if c, ok := v.inner.(hasContext); ok {
return c.Context()
}
return nil
}
func (v *Error) atSeverity(s log.Severity) *Error { func (v *Error) atSeverity(s log.Severity) *Error {
v.severity = s v.severity = s
return v return v
@ -132,25 +110,29 @@ func (v *Error) String() string {
} }
// WriteToLog writes current error into log. // WriteToLog writes current error into log.
func (v *Error) WriteToLog() { func (v *Error) WriteToLog(opts ...ExportOption) {
ctx := v.Context() var holder ExportOptionHolder
var sid session.ID
if ctx != nil { for _, opt := range opts {
sid = session.IDFromContext(ctx) opt(&holder)
} }
var c interface{} = v
if sid > 0 { if holder.SessionID > 0 {
c = sessionLog{ v.prefix = append(v.prefix, holder.SessionID)
id: sid,
content: v,
}
} }
log.Record(&log.GeneralMessage{ log.Record(&log.GeneralMessage{
Severity: GetSeverity(v), Severity: GetSeverity(v),
Content: c, Content: v,
}) })
} }
type ExportOptionHolder struct {
SessionID uint32
}
type ExportOption func(*ExportOptionHolder)
// New returns a new error object with message formed from given arguments. // New returns a new error object with message formed from given arguments.
func New(msg ...interface{}) *Error { func New(msg ...interface{}) *Error {
return &Error{ return &Error{
@ -196,12 +178,3 @@ func GetSeverity(err error) log.Severity {
} }
return log.Severity_Info return log.Severity_Info
} }
type sessionLog struct {
id session.ID
content interface{}
}
func (s sessionLog) String() string {
return serial.Concat("[", s.id, "] ", s.content)
}

View File

@ -0,0 +1,6 @@
package session
type Request struct {
//Destination net.Destination
DecodedLayers []interface{}
}

View File

@ -4,6 +4,8 @@ package session // import "v2ray.com/core/common/session"
import ( import (
"context" "context"
"math/rand" "math/rand"
"v2ray.com/core/common/errors"
) )
// ID of a session. // ID of a session.
@ -38,3 +40,10 @@ func IDFromContext(ctx context.Context) ID {
} }
return 0 return 0
} }
func ExportIDToError(ctx context.Context) errors.ExportOption {
id := IDFromContext(ctx)
return func(h *errors.ExportOptionHolder) {
h.SessionID = uint32(id)
}
}

View File

@ -10,6 +10,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
@ -54,7 +55,7 @@ func (d *DokodemoDoor) policy() core.Policy {
} }
func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error {
newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WithContext(ctx).WriteToLog() newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WriteToLog(session.ExportIDToError(ctx))
dest := net.Destination{ dest := net.Destination{
Network: network, Network: network,
Address: d.address, Address: d.address,

View File

@ -12,6 +12,7 @@ import (
"v2ray.com/core/common/dice" "v2ray.com/core/common/dice"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/retry" "v2ray.com/core/common/retry"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
@ -56,7 +57,7 @@ func (h *Handler) resolveIP(ctx context.Context, domain string) net.Address {
ips, err := h.dns.LookupIP(domain) ips, err := h.dns.LookupIP(domain)
if err != nil { if err != nil {
newError("failed to get IP address for domain ", domain).Base(err).WithContext(ctx).WriteToLog() newError("failed to get IP address for domain ", domain).Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
if len(ips) == 0 { if len(ips) == 0 {
return nil return nil
@ -75,7 +76,7 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
Port: net.Port(server.Port), Port: net.Port(server.Port),
} }
} }
newError("opening connection to ", destination).WithContext(ctx).WriteToLog() newError("opening connection to ", destination).WriteToLog(session.ExportIDToError(ctx))
input := link.Reader input := link.Reader
output := link.Writer output := link.Writer
@ -88,7 +89,7 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
Address: ip, Address: ip,
Port: destination.Port, Port: destination.Port,
} }
newError("changing destination to ", destination).WithContext(ctx).WriteToLog() newError("changing destination to ", destination).WriteToLog(session.ExportIDToError(ctx))
} }
} }

View File

@ -17,6 +17,7 @@ import (
"v2ray.com/core/common/log" "v2ray.com/core/common/log"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
http_proto "v2ray.com/core/common/protocol/http" http_proto "v2ray.com/core/common/protocol/http"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
@ -105,7 +106,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
Start: Start:
if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil { if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil {
newError("failed to set read deadline").Base(err).WithContext(ctx).WriteToLog() newError("failed to set read deadline").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
request, err := http.ReadRequest(reader) request, err := http.ReadRequest(reader)
@ -124,9 +125,9 @@ Start:
} }
} }
newError("request to Method [", request.Method, "] Host [", request.Host, "] with URL [", request.URL, "]").WithContext(ctx).WriteToLog() newError("request to Method [", request.Method, "] Host [", request.Host, "] with URL [", request.URL, "]").WriteToLog(session.ExportIDToError(ctx))
if err := conn.SetReadDeadline(time.Time{}); err != nil { if err := conn.SetReadDeadline(time.Time{}); err != nil {
newError("failed to clear read deadline").Base(err).WithContext(ctx).WriteToLog() newError("failed to clear read deadline").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
defaultPort := net.Port(80) defaultPort := net.Port(80)
@ -285,7 +286,7 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri
result = nil result = nil
} }
} else { } else {
newError("failed to read response from ", request.Host).Base(err).AtWarning().WithContext(ctx).WriteToLog() newError("failed to read response from ", request.Host).Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
response = &http.Response{ response = &http.Response{
Status: "Service Unavailable", Status: "Service Unavailable",
StatusCode: 503, StatusCode: 503,

View File

@ -3,6 +3,7 @@ package shadowsocks
import ( import (
"context" "context"
"v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core" "v2ray.com/core"
@ -64,7 +65,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
if err != nil { if err != nil {
return newError("failed to find an available destination").AtWarning().Base(err) return newError("failed to find an available destination").AtWarning().Base(err)
} }
newError("tunneling request to ", destination, " via ", server.Destination()).WithContext(ctx).WriteToLog() newError("tunneling request to ", destination, " via ", server.Destination()).WriteToLog(session.ExportIDToError(ctx))
defer conn.Close() defer conn.Close()

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core" "v2ray.com/core"
@ -86,7 +87,7 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
request, data, err := DecodeUDPPacket(s.user, payload) request, data, err := DecodeUDPPacket(s.user, payload)
if err != nil { if err != nil {
if source, ok := proxy.SourceFromContext(ctx); ok { if source, ok := proxy.SourceFromContext(ctx); ok {
newError("dropping invalid UDP packet from: ", source).Base(err).WithContext(ctx).WriteToLog() newError("dropping invalid UDP packet from: ", source).Base(err).WriteToLog(session.ExportIDToError(ctx))
log.Record(&log.AccessMessage{ log.Record(&log.AccessMessage{
From: source, From: source,
To: "", To: "",
@ -99,13 +100,13 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
} }
if request.Option.Has(RequestOptionOneTimeAuth) && s.account.OneTimeAuth == Account_Disabled { if request.Option.Has(RequestOptionOneTimeAuth) && s.account.OneTimeAuth == Account_Disabled {
newError("client payload enables OTA but server doesn't allow it").WithContext(ctx).WriteToLog() newError("client payload enables OTA but server doesn't allow it").WriteToLog(session.ExportIDToError(ctx))
payload.Release() payload.Release()
continue continue
} }
if !request.Option.Has(RequestOptionOneTimeAuth) && s.account.OneTimeAuth == Account_Enabled { if !request.Option.Has(RequestOptionOneTimeAuth) && s.account.OneTimeAuth == Account_Enabled {
newError("client payload disables OTA but server forces it").WithContext(ctx).WriteToLog() newError("client payload disables OTA but server forces it").WriteToLog(session.ExportIDToError(ctx))
payload.Release() payload.Release()
continue continue
} }
@ -119,14 +120,14 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
Reason: "", Reason: "",
}) })
} }
newError("tunnelling request to ", dest).WithContext(ctx).WriteToLog() newError("tunnelling request to ", dest).WriteToLog(session.ExportIDToError(ctx))
ctx = protocol.ContextWithUser(ctx, request.User) ctx = protocol.ContextWithUser(ctx, request.User)
udpServer.Dispatch(ctx, dest, data, func(payload *buf.Buffer) { udpServer.Dispatch(ctx, dest, data, func(payload *buf.Buffer) {
data, err := EncodeUDPPacket(request, payload.Bytes()) data, err := EncodeUDPPacket(request, payload.Bytes())
payload.Release() payload.Release()
if err != nil { if err != nil {
newError("failed to encode UDP packet").Base(err).AtWarning().WithContext(ctx).WriteToLog() newError("failed to encode UDP packet").Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
return return
} }
defer data.Release() defer data.Release()
@ -164,7 +165,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
Status: log.AccessAccepted, Status: log.AccessAccepted,
Reason: "", Reason: "",
}) })
newError("tunnelling request to ", dest).WithContext(ctx).WriteToLog() newError("tunnelling request to ", dest).WriteToLog(session.ExportIDToError(ctx))
ctx = protocol.ContextWithUser(ctx, request.User) ctx = protocol.ContextWithUser(ctx, request.User)

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core" "v2ray.com/core"
@ -66,7 +67,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
defer func() { defer func() {
if err := conn.Close(); err != nil { if err := conn.Close(); err != nil {
newError("failed to closed connection").Base(err).WithContext(ctx).WriteToLog() newError("failed to closed connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
}() }()
@ -89,7 +90,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
} }
if err := conn.SetDeadline(time.Now().Add(p.Timeouts.Handshake)); err != nil { if err := conn.SetDeadline(time.Now().Add(p.Timeouts.Handshake)); err != nil {
newError("failed to set deadline for handshake").Base(err).WithContext(ctx).WriteToLog() newError("failed to set deadline for handshake").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
udpRequest, err := ClientHandshake(request, conn, conn) udpRequest, err := ClientHandshake(request, conn, conn)
if err != nil { if err != nil {
@ -97,7 +98,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
} }
if err := conn.SetDeadline(time.Time{}); err != nil { if err := conn.SetDeadline(time.Time{}); err != nil {
newError("failed to clear deadline after handshake").Base(err).WithContext(ctx).WriteToLog() newError("failed to clear deadline after handshake").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)

View File

@ -5,15 +5,15 @@ import (
"io" "io"
"time" "time"
"v2ray.com/core/common/task"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/log" "v2ray.com/core/common/log"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/internet/udp"
@ -69,7 +69,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error {
if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil { if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil {
newError("failed to set deadline").Base(err).WithContext(ctx).WriteToLog() newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
reader := &buf.BufferedReader{Reader: buf.NewReader(conn)} reader := &buf.BufferedReader{Reader: buf.NewReader(conn)}
@ -78,12 +78,12 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispa
if !ok { if !ok {
return newError("inbound entry point not specified") return newError("inbound entry point not specified")
} }
session := &ServerSession{ svrSession := &ServerSession{
config: s.config, config: s.config,
port: inboundDest.Port, port: inboundDest.Port,
} }
request, err := session.Handshake(reader, conn) request, err := svrSession.Handshake(reader, conn)
if err != nil { if err != nil {
if source, ok := proxy.SourceFromContext(ctx); ok { if source, ok := proxy.SourceFromContext(ctx); ok {
log.Record(&log.AccessMessage{ log.Record(&log.AccessMessage{
@ -97,12 +97,12 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispa
} }
if err := conn.SetReadDeadline(time.Time{}); err != nil { if err := conn.SetReadDeadline(time.Time{}); err != nil {
newError("failed to clear deadline").Base(err).WithContext(ctx).WriteToLog() newError("failed to clear deadline").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
if request.Command == protocol.RequestCommandTCP { if request.Command == protocol.RequestCommandTCP {
dest := request.Destination() dest := request.Destination()
newError("TCP Connect request to ", dest).WithContext(ctx).WriteToLog() newError("TCP Connect request to ", dest).WriteToLog(session.ExportIDToError(ctx))
if source, ok := proxy.SourceFromContext(ctx); ok { if source, ok := proxy.SourceFromContext(ctx); ok {
log.Record(&log.AccessMessage{ log.Record(&log.AccessMessage{
From: source, From: source,
@ -175,7 +175,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection,
udpServer := udp.NewDispatcher(dispatcher) udpServer := udp.NewDispatcher(dispatcher)
if source, ok := proxy.SourceFromContext(ctx); ok { if source, ok := proxy.SourceFromContext(ctx); ok {
newError("client UDP connection from ", source).WithContext(ctx).WriteToLog() newError("client UDP connection from ", source).WriteToLog(session.ExportIDToError(ctx))
} }
reader := buf.NewReader(conn) reader := buf.NewReader(conn)
@ -189,7 +189,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection,
request, err := DecodeUDPPacket(payload) request, err := DecodeUDPPacket(payload)
if err != nil { if err != nil {
newError("failed to parse UDP request").Base(err).WithContext(ctx).WriteToLog() newError("failed to parse UDP request").Base(err).WriteToLog(session.ExportIDToError(ctx))
payload.Release() payload.Release()
continue continue
} }
@ -199,7 +199,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection,
continue continue
} }
newError("send packet to ", request.Destination(), " with ", payload.Len(), " bytes").AtDebug().WithContext(ctx).WriteToLog() newError("send packet to ", request.Destination(), " with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx))
if source, ok := proxy.SourceFromContext(ctx); ok { if source, ok := proxy.SourceFromContext(ctx); ok {
log.Record(&log.AccessMessage{ log.Record(&log.AccessMessage{
From: source, From: source,
@ -210,14 +210,14 @@ func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection,
} }
udpServer.Dispatch(ctx, request.Destination(), payload, func(payload *buf.Buffer) { udpServer.Dispatch(ctx, request.Destination(), payload, func(payload *buf.Buffer) {
newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WithContext(ctx).WriteToLog() newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx))
udpMessage, err := EncodeUDPPacket(request, payload.Bytes()) udpMessage, err := EncodeUDPPacket(request, payload.Bytes())
payload.Release() payload.Release()
defer udpMessage.Release() defer udpMessage.Release()
if err != nil { if err != nil {
newError("failed to write UDP response").AtWarning().Base(err).WithContext(ctx).WriteToLog() newError("failed to write UDP response").AtWarning().Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
conn.Write(udpMessage.Bytes()) // nolint: errcheck conn.Write(udpMessage.Bytes()) // nolint: errcheck

View File

@ -9,6 +9,7 @@ import (
"sync" "sync"
"time" "time"
"v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core" "v2ray.com/core"
@ -226,8 +227,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
reader := &buf.BufferedReader{Reader: buf.NewReader(connection)} reader := &buf.BufferedReader{Reader: buf.NewReader(connection)}
session := encoding.NewServerSession(h.clients, h.sessionHistory) svrSession := encoding.NewServerSession(h.clients, h.sessionHistory)
request, err := session.DecodeRequestHeader(reader) request, err := svrSession.DecodeRequestHeader(reader)
if err != nil { if err != nil {
if errors.Cause(err) != io.EOF { if errors.Cause(err) != io.EOF {
@ -261,10 +262,10 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
}) })
} }
newError("received request for ", request.Destination()).WithContext(ctx).WriteToLog() newError("received request for ", request.Destination()).WriteToLog(session.ExportIDToError(ctx))
if err := connection.SetReadDeadline(time.Time{}); err != nil { if err := connection.SetReadDeadline(time.Time{}); err != nil {
newError("unable to set back read deadline").Base(err).WithContext(ctx).WriteToLog() newError("unable to set back read deadline").Base(err).WriteToLog(session.ExportIDToError(ctx))
} }
sessionPolicy = h.policyManager.ForLevel(request.User.Level) sessionPolicy = h.policyManager.ForLevel(request.User.Level)
@ -281,7 +282,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
requestDone := func() error { requestDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
return transferRequest(timer, session, request, reader, link.Writer) return transferRequest(timer, svrSession, request, reader, link.Writer)
} }
responseDone := func() error { responseDone := func() error {
@ -292,7 +293,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
response := &protocol.ResponseHeader{ response := &protocol.ResponseHeader{
Command: h.generateCommand(ctx, request), Command: h.generateCommand(ctx, request),
} }
return transferResponse(timer, session, request, response, link.Reader, writer) return transferResponse(timer, svrSession, request, response, link.Reader, writer)
} }
var requestDonePost = task.Single(requestDone, task.OnSuccess(task.Close(link.Writer))) var requestDonePost = task.Single(requestDone, task.OnSuccess(task.Close(link.Writer)))
@ -311,7 +312,7 @@ func (h *Handler) generateCommand(ctx context.Context, request *protocol.Request
if h.inboundHandlerManager != nil { if h.inboundHandlerManager != nil {
handler, err := h.inboundHandlerManager.GetHandler(ctx, tag) handler, err := h.inboundHandlerManager.GetHandler(ctx, tag)
if err != nil { if err != nil {
newError("failed to get detour handler: ", tag).Base(err).AtWarning().WithContext(ctx).WriteToLog() newError("failed to get detour handler: ", tag).Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
return nil return nil
} }
proxyHandler, port, availableMin := handler.GetRandomInboundProxy() proxyHandler, port, availableMin := handler.GetRandomInboundProxy()
@ -321,7 +322,7 @@ func (h *Handler) generateCommand(ctx context.Context, request *protocol.Request
availableMin = 255 availableMin = 255
} }
newError("pick detour handler for port ", port, " for ", availableMin, " minutes.").AtDebug().WithContext(ctx).WriteToLog() newError("pick detour handler for port ", port, " for ", availableMin, " minutes.").AtDebug().WriteToLog(session.ExportIDToError(ctx))
user := inboundHandler.GetUser(request.User.Email) user := inboundHandler.GetUser(request.User.Email)
if user == nil { if user == nil {
return nil return nil

View File

@ -6,6 +6,7 @@ import (
"context" "context"
"time" "time"
"v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -69,7 +70,7 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
if !ok { if !ok {
return newError("target not specified").AtError() return newError("target not specified").AtError()
} }
newError("tunneling request to ", target, " via ", rec.Destination()).WithContext(ctx).WriteToLog() newError("tunneling request to ", target, " via ", rec.Destination()).WriteToLog(session.ExportIDToError(ctx))
command := protocol.RequestCommandTCP command := protocol.RequestCommandTCP
if target.Network == net.Network_UDP { if target.Network == net.Network_UDP {

View File

@ -5,6 +5,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls" "v2ray.com/core/transport/internet/tls"
) )
@ -19,7 +20,7 @@ func getTCPSettingsFromContext(ctx context.Context) *Config {
// Dial dials a new TCP connection to the given destination. // Dial dials a new TCP connection to the given destination.
func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) { func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) {
newError("dialing TCP to ", dest).WithContext(ctx).WriteToLog() newError("dialing TCP to ", dest).WriteToLog(session.ExportIDToError(ctx))
src := internet.DialerSourceFromContext(ctx) src := internet.DialerSourceFromContext(ctx)
conn, err := internet.DialSystem(ctx, src, dest) conn, err := internet.DialSystem(ctx, src, dest)

View File

@ -7,6 +7,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls" "v2ray.com/core/transport/internet/tls"
) )
@ -29,7 +30,7 @@ func ListenTCP(ctx context.Context, address net.Address, port net.Port, handler
if err != nil { if err != nil {
return nil, err return nil, err
} }
newError("listening TCP on ", address, ":", port).WithContext(ctx).WriteToLog() newError("listening TCP on ", address, ":", port).WriteToLog(session.ExportIDToError(ctx))
tcpSettings := getTCPSettingsFromContext(ctx) tcpSettings := getTCPSettingsFromContext(ctx)

View File

@ -9,6 +9,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
) )
@ -72,13 +73,13 @@ func (v *Dispatcher) getInboundRay(dest net.Destination, callback ResponseCallba
func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer, callback ResponseCallback) { func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer, callback ResponseCallback) {
// TODO: Add user to destString // TODO: Add user to destString
newError("dispatch request to: ", destination).AtDebug().WithContext(ctx).WriteToLog() newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx))
conn := v.getInboundRay(destination, callback) conn := v.getInboundRay(destination, callback)
outputStream := conn.link.Writer outputStream := conn.link.Writer
if outputStream != nil { if outputStream != nil {
if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil { if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil {
newError("failed to write first UDP payload").Base(err).WithContext(ctx).WriteToLog() newError("failed to write first UDP payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
conn.cancel() conn.cancel()
return return
} }
@ -98,7 +99,7 @@ func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback
mb, err := input.ReadMultiBuffer() mb, err := input.ReadMultiBuffer()
if err != nil { if err != nil {
newError("failed to handle UDP input").Base(err).WithContext(ctx).WriteToLog() newError("failed to handle UDP input").Base(err).WriteToLog(session.ExportIDToError(ctx))
conn.cancel() conn.cancel()
return return
} }

View File

@ -8,13 +8,14 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls" "v2ray.com/core/transport/internet/tls"
) )
// Dial dials a WebSocket connection to the given destination. // Dial dials a WebSocket connection to the given destination.
func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) { func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) {
newError("creating connection to ", dest).WithContext(ctx).WriteToLog() newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx))
conn, err := dialWebsocket(ctx, dest) conn, err := dialWebsocket(ctx, dest)
if err != nil { if err != nil {