diff --git a/app/dispatcher/dispatcher.go b/app/dispatcher/dispatcher.go index ef434fb17..526674a95 100644 --- a/app/dispatcher/dispatcher.go +++ b/app/dispatcher/dispatcher.go @@ -4,12 +4,13 @@ import ( "context" "v2ray.com/core/app" + "v2ray.com/core/common/net" "v2ray.com/core/transport/ray" ) // Interface dispatch a packet and possibly further network payload to its destination. type Interface interface { - DispatchToOutbound(ctx context.Context) ray.InboundRay + Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) Start() error Close() } diff --git a/app/dispatcher/impl/default.go b/app/dispatcher/impl/default.go index 5757ef007..87c43d89c 100644 --- a/app/dispatcher/impl/default.go +++ b/app/dispatcher/impl/default.go @@ -12,6 +12,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" + "v2ray.com/core/common/net" "v2ray.com/core/proxy" "v2ray.com/core/transport/ray" ) @@ -48,13 +49,14 @@ func (DefaultDispatcher) Interface() interface{} { return (*dispatcher.Interface)(nil) } -func (v *DefaultDispatcher) DispatchToOutbound(ctx context.Context) ray.InboundRay { +func (v *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (ray.InboundRay, error) { dispatcher := v.ohm.GetDefaultHandler() - destination := proxy.DestinationFromContext(ctx) if !destination.IsValid() { panic("Dispatcher: Invalid destination.") } + ctx = proxy.ContextWithDestination(ctx, destination) + if v.router != nil { if tag, err := v.router.TakeDetour(ctx); err == nil { if handler := v.ohm.GetHandler(tag); handler != nil { @@ -82,7 +84,7 @@ func (v *DefaultDispatcher) DispatchToOutbound(ctx context.Context) ray.InboundR go v.waitAndDispatch(ctx, waitFunc, direct, dispatcher) - return direct + return direct, nil } func (v *DefaultDispatcher) waitAndDispatch(ctx context.Context, wait func() error, link ray.OutboundRay, dispatcher proxyman.OutboundHandler) { diff --git a/app/proxyman/inbound/always.go b/app/proxyman/inbound/always.go index be0e9658d..06d101aa0 100644 --- a/app/proxyman/inbound/always.go +++ b/app/proxyman/inbound/always.go @@ -3,16 +3,21 @@ package inbound import ( "context" + "v2ray.com/core/app" + "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/app/proxyman" "v2ray.com/core/common/dice" - "v2ray.com/core/app/log" + "v2ray.com/core/common/errors" "v2ray.com/core/common/net" "v2ray.com/core/proxy" + "v2ray.com/core/transport/ray" ) type AlwaysOnInboundHandler struct { - proxy proxy.Inbound - workers []worker + proxy proxy.Inbound + workers []worker + dispatcher dispatcher.Interface } func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) { @@ -25,6 +30,16 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig * proxy: p, } + space := app.SpaceFromContext(ctx) + space.OnInitialize(func() error { + d := dispatcher.FromSpace(space) + if d == nil { + return errors.New("Proxyman|DefaultInboundHandler: No dispatcher in space.") + } + h.dispatcher = d + return nil + }) + nl := p.Network() pr := receiverConfig.PortRange address := receiverConfig.Listen.AsAddress() @@ -42,6 +57,7 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig * recvOrigDest: receiverConfig.ReceiveOriginalDestination, tag: tag, allowPassiveConn: receiverConfig.AllowPassiveConnection, + dispatcher: h, } h.workers = append(h.workers, worker) } @@ -53,6 +69,7 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig * address: address, port: net.Port(port), recvOrigDest: receiverConfig.ReceiveOriginalDestination, + dispatcher: h, } h.workers = append(h.workers, worker) } @@ -80,3 +97,7 @@ func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (proxy.Inbound, net.Por w := h.workers[dice.Roll(len(h.workers))] return w.Proxy(), w.Port(), 9999 } + +func (h *AlwaysOnInboundHandler) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) { + return h.dispatcher.Dispatch(ctx, dest) +} diff --git a/app/proxyman/inbound/dynamic.go b/app/proxyman/inbound/dynamic.go index 91d03f380..69934ec49 100644 --- a/app/proxyman/inbound/dynamic.go +++ b/app/proxyman/inbound/dynamic.go @@ -2,14 +2,18 @@ package inbound import ( "context" + "errors" "sync" "time" + "v2ray.com/core/app" + "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/app/proxyman" "v2ray.com/core/common/dice" - "v2ray.com/core/app/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/proxy" + "v2ray.com/core/transport/ray" ) type DynamicInboundHandler struct { @@ -23,6 +27,7 @@ type DynamicInboundHandler struct { workerMutex sync.RWMutex worker []worker lastRefresh time.Time + dispatcher dispatcher.Interface } func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*DynamicInboundHandler, error) { @@ -36,6 +41,16 @@ func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *p portsInUse: make(map[v2net.Port]bool), } + space := app.SpaceFromContext(ctx) + space.OnInitialize(func() error { + d := dispatcher.FromSpace(space) + if d == nil { + return errors.New("Proxyman|DefaultInboundHandler: No dispatcher in space.") + } + h.dispatcher = d + return nil + }) + return h, nil } @@ -102,6 +117,7 @@ func (h *DynamicInboundHandler) refresh() error { stream: h.receiverConfig.StreamSettings, recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, allowPassiveConn: h.receiverConfig.AllowPassiveConnection, + dispatcher: h, } if err := worker.Start(); err != nil { log.Warning("Proxyman:InboundHandler: Failed to create TCP worker: ", err) @@ -117,6 +133,7 @@ func (h *DynamicInboundHandler) refresh() error { address: address, port: port, recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, + dispatcher: h, } if err := worker.Start(); err != nil { log.Warning("Proxyman:InboundHandler: Failed to create UDP worker: ", err) @@ -164,3 +181,7 @@ func (h *DynamicInboundHandler) GetRandomInboundProxy() (proxy.Inbound, v2net.Po expire := h.receiverConfig.AllocationStrategy.GetRefreshValue() - uint32(time.Since(h.lastRefresh)/time.Minute) return w.Proxy(), w.Port(), int(expire) } + +func (h *DynamicInboundHandler) Dispatch(ctx context.Context, dest v2net.Destination) (ray.InboundRay, error) { + return h.dispatcher.Dispatch(ctx, dest) +} diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index a1b918aee..90685d696 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/common/buf" v2net "v2ray.com/core/common/net" "v2ray.com/core/proxy" @@ -31,6 +33,7 @@ type tcpWorker struct { recvOrigDest bool tag string allowPassiveConn bool + dispatcher dispatcher.Interface ctx context.Context cancel context.CancelFunc @@ -51,7 +54,9 @@ func (w *tcpWorker) callback(conn internet.Connection) { ctx = proxy.ContextWithAllowPassiveConnection(ctx, w.allowPassiveConn) ctx = proxy.ContextWithInboundDestination(ctx, v2net.TCPDestination(w.address, w.port)) ctx = proxy.ContextWithSource(ctx, v2net.DestinationFromAddr(conn.RemoteAddr())) - w.proxy.Process(ctx, v2net.Network_TCP, conn) + if err := w.proxy.Process(ctx, v2net.Network_TCP, conn, w.dispatcher); err != nil { + log.Info("Proxyman|TCPWorker: Connection ends with ", err) + } cancel() conn.Close() } @@ -151,6 +156,7 @@ type udpWorker struct { port v2net.Port recvOrigDest bool tag string + dispatcher dispatcher.Interface ctx context.Context cancel context.CancelFunc @@ -206,7 +212,9 @@ func (w *udpWorker) callback(b *buf.Buffer, source v2net.Destination, originalDe } ctx = proxy.ContextWithSource(ctx, source) ctx = proxy.ContextWithInboundDestination(ctx, v2net.UDPDestination(w.address, w.port)) - w.proxy.Process(ctx, v2net.Network_UDP, conn) + if err := w.proxy.Process(ctx, v2net.Network_UDP, conn, w.dispatcher); err != nil { + log.Info("Proxyman|UDPWorker: Connection ends with ", err) + } w.removeConn(source) cancel() }() diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index d162baa2e..365c9bee7 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -7,10 +7,10 @@ import ( "time" "v2ray.com/core/app" + "v2ray.com/core/app/log" "v2ray.com/core/app/proxyman" "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" @@ -64,8 +64,7 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H } func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) { - ctx = proxy.ContextWithDialer(ctx, h) - err := h.proxy.Process(ctx, outboundRay) + err := h.proxy.Process(ctx, outboundRay, h) // Ensure outbound ray is properly closed. if err != nil && errors.Cause(err) != io.EOF { outboundRay.OutboundOutput().CloseError() diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index 43e3ff2e5..17116d3fe 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -7,6 +7,7 @@ import ( "time" "v2ray.com/core/common" + "v2ray.com/core/proxy" "v2ray.com/core/transport/ray" ) @@ -31,7 +32,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } // Dispatch implements OutboundHandler.Dispatch(). -func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) error { +func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { v.response.WriteTo(outboundRay.OutboundOutput()) // Sleep a little here to make sure the response is sent to client. time.Sleep(time.Second) diff --git a/proxy/context.go b/proxy/context.go index 7109a9b55..586c54ea8 100644 --- a/proxy/context.go +++ b/proxy/context.go @@ -9,8 +9,7 @@ import ( type key int const ( - dialerKey key = iota - sourceKey + sourceKey key = iota destinationKey originalDestinationKey inboundDestinationKey @@ -18,20 +17,9 @@ const ( outboundTagKey resolvedIPsKey allowPassiveConnKey + dispatcherKey ) -func ContextWithDialer(ctx context.Context, dialer Dialer) context.Context { - return context.WithValue(ctx, dialerKey, dialer) -} - -func DialerFromContext(ctx context.Context) Dialer { - v := ctx.Value(dialerKey) - if v == nil { - return nil - } - return v.(Dialer) -} - func ContextWithSource(ctx context.Context, src net.Destination) context.Context { return context.WithValue(ctx, sourceKey, src) } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 8aa537980..3f729d220 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -7,10 +7,10 @@ import ( "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/signal" "v2ray.com/core/proxy" @@ -18,10 +18,9 @@ import ( ) type DokodemoDoor struct { - config *Config - address net.Address - port net.Port - packetDispatcher dispatcher.Interface + config *Config + address net.Address + port net.Port } func New(ctx context.Context, config *Config) (*DokodemoDoor, error) { @@ -37,13 +36,6 @@ func New(ctx context.Context, config *Config) (*DokodemoDoor, error) { address: config.GetPredefinedAddress(), port: net.Port(config.Port), } - space.OnInitialize(func() error { - d.packetDispatcher = dispatcher.FromSpace(space) - if d.packetDispatcher == nil { - return errors.New("Dokodemo: Dispatcher is not found in the space.") - } - return nil - }) return d, nil } @@ -51,7 +43,7 @@ func (d *DokodemoDoor) Network() net.NetworkList { return *(d.config.NetworkList) } -func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection) error { +func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { log.Debug("Dokodemo: processing connection from: ", conn.RemoteAddr()) conn.SetReusable(false) dest := net.Destination{ @@ -68,7 +60,6 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in log.Info("Dokodemo: Invalid destination. Discarding...") return errors.New("Dokodemo: Unable to get destination.") } - ctx = proxy.ContextWithDestination(ctx, dest) ctx, cancel := context.WithCancel(ctx) timeout := time.Second * time.Duration(d.config.Timeout) if timeout == 0 { @@ -76,7 +67,10 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in } timer := signal.CancelAfterInactivity(ctx, cancel, timeout) - inboundRay := d.packetDispatcher.DispatchToOutbound(ctx) + inboundRay, err := dispatcher.Dispatch(ctx, dest) + if err != nil { + return err + } requestDone := signal.ExecuteAsync(func() error { defer inboundRay.InboundInput().Close() diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 08095f4d1..6d768e102 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -8,11 +8,11 @@ import ( "v2ray.com/core/app" "v2ray.com/core/app/dns" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/dice" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/retry" "v2ray.com/core/common/signal" @@ -73,7 +73,7 @@ func (v *Handler) ResolveIP(destination net.Destination) net.Destination { return newDest } -func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) error { +func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { destination := proxy.DestinationFromContext(ctx) if v.destOverride != nil { server := v.destOverride.Server @@ -93,7 +93,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) erro destination = v.ResolveIP(destination) } - dialer := proxy.DialerFromContext(ctx) err := retry.ExponentialBackoff(5, 100).On(func() error { rawConn, err := dialer.Dial(ctx, destination) if err != nil { diff --git a/proxy/http/server.go b/proxy/http/server.go index 019f89d8d..ac6956db4 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -8,27 +8,23 @@ import ( "runtime" "strconv" "strings" - "sync" "time" "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/signal" - "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" ) // Server is a HTTP proxy server. type Server struct { - sync.Mutex - packetDispatcher dispatcher.Interface - config *ServerConfig + config *ServerConfig } // NewServer creates a new HTTP inbound handler. @@ -40,13 +36,6 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) { s := &Server{ config: config, } - space.OnInitialize(func() error { - s.packetDispatcher = dispatcher.FromSpace(space) - if s.packetDispatcher == nil { - return errors.New("HTTP|Server: Dispatcher not found in space.") - } - return nil - }) return s, nil } @@ -79,7 +68,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error return v2net.TCPDestination(v2net.DomainAddress(host), port), nil } -func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection) error { +func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { conn.SetReusable(false) conn.SetReadDeadline(time.Now().Add(time.Second * 8)) @@ -109,15 +98,15 @@ func (s *Server) Process(ctx context.Context, network v2net.Network, conn intern return err } log.Access(conn.RemoteAddr(), request.URL, log.AccessAccepted, "") - ctx = proxy.ContextWithDestination(ctx, dest) + if strings.ToUpper(request.Method) == "CONNECT" { - return s.handleConnect(ctx, request, reader, conn) + return s.handleConnect(ctx, request, reader, conn, dest, dispatcher) } else { - return s.handlePlainHTTP(ctx, request, reader, conn) + return s.handlePlainHTTP(ctx, request, reader, conn, dest, dispatcher) } } -func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer) error { +func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer, dest v2net.Destination, dispatcher dispatcher.Interface) error { response := &http.Response{ Status: "200 OK", StatusCode: 200, @@ -140,7 +129,10 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade timeout = time.Minute * 2 } timer := signal.CancelAfterInactivity(ctx, cancel, timeout) - ray := s.packetDispatcher.DispatchToOutbound(ctx) + ray, err := dispatcher.Dispatch(ctx, dest) + if err != nil { + return err + } requestDone := signal.ExecuteAsync(func() error { defer ray.InboundInput().Close() @@ -213,7 +205,7 @@ func generateResponse(statusCode int, status string) *http.Response { } } -func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer) error { +func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer, dest v2net.Destination, dispatcher dispatcher.Interface) error { if len(request.URL.Host) <= 0 { response := generateResponse(400, "Bad Request") return response.Write(writer) @@ -222,7 +214,10 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, rea request.Host = request.URL.Host StripHopByHopHeaders(request) - ray := s.packetDispatcher.DispatchToOutbound(ctx) + ray, err := dispatcher.Dispatch(ctx, dest) + if err != nil { + return err + } input := ray.InboundInput() output := ray.InboundOutput() defer input.Close() diff --git a/proxy/proxy.go b/proxy/proxy.go index ddcea305e..ce57472cc 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -4,6 +4,7 @@ package proxy import ( "context" + "v2ray.com/core/app/dispatcher" "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/ray" @@ -13,12 +14,12 @@ import ( type Inbound interface { Network() net.NetworkList - Process(context.Context, net.Network, internet.Connection) error + Process(context.Context, net.Network, internet.Connection, dispatcher.Interface) error } // An Outbound process outbound connections. type Outbound interface { - Process(context.Context, ray.OutboundRay) error + Process(context.Context, ray.OutboundRay, Dialer) error } // Dialer is used by OutboundHandler for creating outbound connections. diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 44368328d..43fc09d4d 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -7,10 +7,10 @@ import ( "runtime" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" @@ -39,14 +39,13 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { } // Process implements OutboundHandler.Process(). -func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error { +func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { destination := proxy.DestinationFromContext(ctx) network := destination.Network var server *protocol.ServerSpec var conn internet.Connection - dialer := proxy.DialerFromContext(ctx) err := retry.ExponentialBackoff(5, 100).On(func() error { server = v.serverPicker.PickServer() dest := server.Destination() diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 984ed20f9..437c99b12 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -2,17 +2,16 @@ package shadowsocks import ( "context" - "time" - "runtime" + "time" "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/signal" @@ -22,11 +21,9 @@ import ( ) type Server struct { - packetDispatcher dispatcher.Interface - config *ServerConfig - user *protocol.User - account *ShadowsocksAccount - udpServer *udp.Dispatcher + config *ServerConfig + user *protocol.User + account *ShadowsocksAccount } func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) { @@ -50,14 +47,6 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) { account: account, } - space.OnInitialize(func() error { - s.packetDispatcher = dispatcher.FromSpace(space) - if s.packetDispatcher == nil { - return errors.New("Shadowsocks|Server: Dispatcher is not found in space.") - } - return nil - }) - return s, nil } @@ -71,20 +60,21 @@ func (s *Server) Network() net.NetworkList { return list } -func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error { +func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { conn.SetReusable(false) switch network { case net.Network_TCP: - return s.handleConnection(ctx, conn) + return s.handleConnection(ctx, conn, dispatcher) case net.Network_UDP: - return s.handlerUDPPayload(ctx, conn) + return s.handlerUDPPayload(ctx, conn, dispatcher) default: return errors.New("Shadowsocks|Server: Unknown network: ", network) } } -func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection) error { +func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error { + udpServer := udp.NewDispatcher(dispatcher) source := proxy.SourceFromContext(ctx) reader := buf.NewReader(conn) @@ -119,7 +109,7 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection log.Info("Shadowsocks|Server: Tunnelling request to ", dest) ctx = protocol.ContextWithUser(ctx, request.User) - v.udpServer.Dispatch(ctx, dest, data, func(payload *buf.Buffer) { + udpServer.Dispatch(ctx, dest, data, func(payload *buf.Buffer) { defer payload.Release() data, err := EncodeUDPPacket(request, payload) @@ -136,7 +126,7 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection return nil } -func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) error { +func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error { conn.SetReadDeadline(time.Now().Add(time.Second * 8)) bufferedReader := bufio.NewReader(conn) request, bodyReader, err := ReadTCPSession(s.user, bufferedReader) @@ -153,13 +143,15 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "") log.Info("Shadowsocks|Server: Tunnelling request to ", dest) - ctx = proxy.ContextWithDestination(ctx, dest) ctx = protocol.ContextWithUser(ctx, request.User) ctx, cancel := context.WithCancel(ctx) userSettings := s.user.GetSettings() timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout) - ray := s.packetDispatcher.DispatchToOutbound(ctx) + ray, err := dispatcher.Dispatch(ctx, dest) + if err != nil { + return err + } requestDone := signal.ExecuteAsync(func() error { bufferedWriter := bufio.NewWriter(conn) diff --git a/proxy/socks/client.go b/proxy/socks/client.go index 2aca0af4e..2156f720a 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -6,9 +6,9 @@ import ( "runtime" "time" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" @@ -34,13 +34,12 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { return client, nil } -func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error { +func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.Dialer) error { destination := proxy.DestinationFromContext(ctx) var server *protocol.ServerSpec var conn internet.Connection - dialer := proxy.DialerFromContext(ctx) err := retry.ExponentialBackoff(5, 100).On(func() error { server = c.serverPicker.PickServer() dest := server.Destination() diff --git a/proxy/socks/server.go b/proxy/socks/server.go index b4de26a8b..4ed8abbf6 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -8,11 +8,11 @@ import ( "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/signal" @@ -23,9 +23,7 @@ import ( // Server is a SOCKS 5 proxy server type Server struct { - packetDispatcher dispatcher.Interface - config *ServerConfig - udpServer *udp.Dispatcher + config *ServerConfig } // NewServer creates a new Server object. @@ -37,14 +35,6 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) { s := &Server{ config: config, } - space.OnInitialize(func() error { - s.packetDispatcher = dispatcher.FromSpace(space) - if s.packetDispatcher == nil { - return errors.New("Socks|Server: Dispatcher is not found in the space.") - } - s.udpServer = udp.NewDispatcher(s.packetDispatcher) - return nil - }) return s, nil } @@ -58,20 +48,20 @@ func (s *Server) Network() net.NetworkList { return list } -func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error { +func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { conn.SetReusable(false) switch network { case net.Network_TCP: - return s.processTCP(ctx, conn) + return s.processTCP(ctx, conn, dispatcher) case net.Network_UDP: - return s.handleUDPPayload(ctx, conn) + return s.handleUDPPayload(ctx, conn, dispatcher) default: return errors.New("Socks|Server: Unknown network: ", network) } } -func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error { +func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error { conn.SetReadDeadline(time.Now().Add(time.Second * 8)) reader := bufio.NewReader(conn) @@ -95,8 +85,7 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error log.Info("Socks|Server: TCP Connect request to ", dest) log.Access(source, dest, log.AccessAccepted, "") - ctx = proxy.ContextWithDestination(ctx, dest) - return s.transport(ctx, reader, conn) + return s.transport(ctx, reader, conn, dest, dispatcher) } if request.Command == protocol.RequestCommandUDP { @@ -115,7 +104,7 @@ func (*Server) handleUDP() error { return nil } -func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer) error { +func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error { ctx, cancel := context.WithCancel(ctx) timeout := time.Second * time.Duration(v.config.Timeout) if timeout == 0 { @@ -123,7 +112,11 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ } timer := signal.CancelAfterInactivity(ctx, cancel, timeout) - ray := v.packetDispatcher.DispatchToOutbound(ctx) + ray, err := dispatcher.Dispatch(ctx, dest) + if err != nil { + return err + } + input := ray.InboundInput() output := ray.InboundOutput() @@ -159,7 +152,9 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ return nil } -func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection) error { +func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error { + udpServer := udp.NewDispatcher(dispatcher) + source := proxy.SourceFromContext(ctx) log.Info("Socks|Server: Client UDP connection from ", source) @@ -185,7 +180,7 @@ func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection) dataBuf := buf.NewSmall() dataBuf.Append(data) - v.udpServer.Dispatch(ctx, request.Destination(), dataBuf, func(payload *buf.Buffer) { + udpServer.Dispatch(ctx, request.Destination(), dataBuf, func(payload *buf.Buffer) { defer payload.Release() log.Info("Socks|Server: Writing back UDP response with ", payload.Len(), " bytes") diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index ecf9034f7..1d3d202e7 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -10,18 +10,17 @@ import ( "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/log" "v2ray.com/core/app/proxyman" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/serial" "v2ray.com/core/common/signal" "v2ray.com/core/common/uuid" - "v2ray.com/core/proxy" "v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/transport/internet" @@ -75,8 +74,6 @@ func (v *userByEmail) Get(email string) (*protocol.User, bool) { // Inbound connection handler that handles messages in VMess format. type VMessInboundHandler struct { - sync.RWMutex - packetDispatcher dispatcher.Interface inboundHandlerManager proxyman.InboundHandlerManager clients protocol.UserValidator usersByEmail *userByEmail @@ -101,10 +98,6 @@ func New(ctx context.Context, config *Config) (*VMessInboundHandler, error) { } space.OnInitialize(func() error { - handler.packetDispatcher = dispatcher.FromSpace(space) - if handler.packetDispatcher == nil { - return errors.New("VMess|Inbound: Dispatcher is not found in space.") - } handler.inboundHandlerManager = proxyman.InboundHandlerManagerFromSpace(space) if handler.inboundHandlerManager == nil { return errors.New("VMess|Inbound: InboundHandlerManager is not found is space.") @@ -122,9 +115,6 @@ func (*VMessInboundHandler) Network() net.NetworkList { } func (v *VMessInboundHandler) GetUser(email string) *protocol.User { - v.RLock() - defer v.RUnlock() - user, existing := v.usersByEmail.Get(email) if !existing { v.clients.Add(user) @@ -177,7 +167,7 @@ func transferResponse(timer *signal.ActivityTimer, session *encoding.ServerSessi return nil } -func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection internet.Connection) error { +func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher dispatcher.Interface) error { connection.SetReadDeadline(time.Now().Add(time.Second * 8)) reader := bufio.NewReader(connection) @@ -200,11 +190,13 @@ func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse)) userSettings := request.User.GetSettings() - ctx = proxy.ContextWithDestination(ctx, request.Destination()) ctx = protocol.ContextWithUser(ctx, request.User) ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout) - ray := v.packetDispatcher.DispatchToOutbound(ctx) + ray, err := dispatcher.Dispatch(ctx, request.Destination()) + if err != nil { + return err + } input := ray.InboundInput() output := ray.InboundOutput() diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 6e7408c49..f32c2b76e 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -6,11 +6,11 @@ import ( "time" "v2ray.com/core/app" + "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" @@ -47,11 +47,10 @@ func New(ctx context.Context, config *Config) (*VMessOutboundHandler, error) { } // Dispatch implements OutboundHandler.Dispatch(). -func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.OutboundRay) error { +func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { var rec *protocol.ServerSpec var conn internet.Connection - dialer := proxy.DialerFromContext(ctx) err := retry.ExponentialBackoff(5, 100).On(func() error { rec = v.serverPicker.PickServer() rawConn, err := dialer.Dial(ctx, rec.Destination()) diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index f51b6ed6e..969aa056a 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -5,10 +5,9 @@ import ( "sync" "v2ray.com/core/app/dispatcher" - "v2ray.com/core/common/buf" "v2ray.com/core/app/log" + "v2ray.com/core/common/buf" v2net "v2ray.com/core/common/net" - "v2ray.com/core/proxy" "v2ray.com/core/transport/ray" ) @@ -16,14 +15,14 @@ type ResponseCallback func(payload *buf.Buffer) type Dispatcher struct { sync.RWMutex - conns map[string]ray.InboundRay - packetDispatcher dispatcher.Interface + conns map[string]ray.InboundRay + dispatcher dispatcher.Interface } -func NewDispatcher(packetDispatcher dispatcher.Interface) *Dispatcher { +func NewDispatcher(dispatcher dispatcher.Interface) *Dispatcher { return &Dispatcher{ - conns: make(map[string]ray.InboundRay), - packetDispatcher: packetDispatcher, + conns: make(map[string]ray.InboundRay), + dispatcher: dispatcher, } } @@ -47,8 +46,8 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest v2net.Destination) } log.Info("UDP|Server: establishing new connection for ", dest) - ctx = proxy.ContextWithDestination(ctx, dest) - return v.packetDispatcher.DispatchToOutbound(ctx), false + inboundRay, _ := v.dispatcher.Dispatch(ctx, dest) + return inboundRay, false } func (v *Dispatcher) Dispatch(ctx context.Context, destination v2net.Destination, payload *buf.Buffer, callback ResponseCallback) {