From 7765fedd782081b1cabe2d83bf5e6be91192f928 Mon Sep 17 00:00:00 2001 From: v2ray Date: Tue, 17 May 2016 23:05:52 -0700 Subject: [PATCH] modulize dispatcher and proxyman --- app/dispatcher/dispatcher.go | 23 --------- app/dispatcher/impl/default.go | 71 +++++++++++++++++++++++++ app/dns/dns.go | 23 --------- app/dns/server.go | 2 +- app/dns/server_test.go | 40 ++++----------- app/proxyman/proxyman.go | 54 +++++++++++++------ app/router/router.go | 4 ++ app/space.go | 60 ++++------------------ app/testing/space.go | 9 ---- shell/point/point.go | 94 ++++++++++++---------------------- 10 files changed, 167 insertions(+), 213 deletions(-) create mode 100644 app/dispatcher/impl/default.go delete mode 100644 app/testing/space.go diff --git a/app/dispatcher/dispatcher.go b/app/dispatcher/dispatcher.go index 8e9626e27..a8902a3db 100644 --- a/app/dispatcher/dispatcher.go +++ b/app/dispatcher/dispatcher.go @@ -14,26 +14,3 @@ const ( type PacketDispatcher interface { DispatchToOutbound(destination v2net.Destination) ray.InboundRay } - -type packetDispatcherWithContext interface { - DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay -} - -type contextedPacketDispatcher struct { - context app.Context - packetDispatcher packetDispatcherWithContext -} - -func (this *contextedPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay { - return this.packetDispatcher.DispatchToOutbound(this.context, destination) -} - -func init() { - app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} { - packetDispatcher := obj.(packetDispatcherWithContext) - return &contextedPacketDispatcher{ - context: context, - packetDispatcher: packetDispatcher, - } - }) -} diff --git a/app/dispatcher/impl/default.go b/app/dispatcher/impl/default.go new file mode 100644 index 000000000..a5435bd52 --- /dev/null +++ b/app/dispatcher/impl/default.go @@ -0,0 +1,71 @@ +package impl + +import ( + "github.com/v2ray/v2ray-core/app" + "github.com/v2ray/v2ray-core/app/dns" + "github.com/v2ray/v2ray-core/app/proxyman" + "github.com/v2ray/v2ray-core/app/router" + "github.com/v2ray/v2ray-core/common/log" + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport/ray" +) + +type DefaultDispatcher struct { + ohm proxyman.OutboundHandlerManager + router router.Router + dns dns.Server +} + +func NewDefaultDispatcher(space app.Space) *DefaultDispatcher { + if !space.HasApp(proxyman.APP_ID_OUTBOUND_MANAGER) { + log.Error("DefaultDispatcher: OutboundHandlerManager is not found in the space.") + return nil + } + if !space.HasApp(dns.APP_ID) { + log.Error("DefaultDispatcher: DNS is not found in the space.") + return nil + } + d := &DefaultDispatcher{ + ohm: space.GetApp(proxyman.APP_ID_OUTBOUND_MANAGER).(proxyman.OutboundHandlerManager), + dns: space.GetApp(dns.APP_ID).(dns.Server), + } + if space.HasApp(router.APP_ID) { + d.router = space.GetApp(router.APP_ID).(router.Router) + } + return d +} + +func (this *DefaultDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay { + direct := ray.NewRay() + dispatcher := this.ohm.GetDefaultHandler() + + if this.router != nil { + if tag, err := this.router.TakeDetour(destination); err == nil { + if handler := this.ohm.GetHandler(tag); handler != nil { + log.Info("DefaultDispatcher: Taking detour [", tag, "] for [", destination, "].") + dispatcher = handler + } else { + log.Warning("DefaultDispatcher: Nonexisting tag: ", tag) + } + } else { + log.Info("DefaultDispatcher: Default route for ", destination) + } + } + + go this.FilterPacketAndDispatch(destination, direct, dispatcher) + + return direct +} + +// @Private +func (this *DefaultDispatcher) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) { + payload, err := link.OutboundInput().Read() + if err != nil { + log.Info("DefaultDispatcher: No payload to dispatch, stopping now.") + link.OutboundInput().Release() + link.OutboundOutput().Release() + return + } + dispatcher.Dispatch(destination, payload, link) +} diff --git a/app/dns/dns.go b/app/dns/dns.go index 1923e5885..9caaefd96 100644 --- a/app/dns/dns.go +++ b/app/dns/dns.go @@ -14,26 +14,3 @@ const ( type Server interface { Get(domain string) []net.IP } - -type dnsServerWithContext interface { - Get(context app.Context, domain string) []net.IP -} - -type contextedDnsServer struct { - context app.Context - dnsCache dnsServerWithContext -} - -func (this *contextedDnsServer) Get(domain string) []net.IP { - return this.dnsCache.Get(this.context, domain) -} - -func init() { - app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} { - dcContext := obj.(dnsServerWithContext) - return &contextedDnsServer{ - context: context, - dnsCache: dcContext, - } - }) -} diff --git a/app/dns/server.go b/app/dns/server.go index 686383afb..c3078b798 100644 --- a/app/dns/server.go +++ b/app/dns/server.go @@ -53,7 +53,7 @@ func (this *CacheServer) GetCached(domain string) []net.IP { return nil } -func (this *CacheServer) Get(context app.Context, domain string) []net.IP { +func (this *CacheServer) Get(domain string) []net.IP { domain = dns.Fqdn(domain) ips := this.GetCached(domain) if ips != nil { diff --git a/app/dns/server_test.go b/app/dns/server_test.go index ba1f47d84..5eb75a5f9 100644 --- a/app/dns/server_test.go +++ b/app/dns/server_test.go @@ -6,44 +6,26 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app/dispatcher" + dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl" . "github.com/v2ray/v2ray-core/app/dns" - apptesting "github.com/v2ray/v2ray-core/app/testing" + "github.com/v2ray/v2ray-core/app/proxyman" v2net "github.com/v2ray/v2ray-core/common/net" netassert "github.com/v2ray/v2ray-core/common/net/testing/assert" "github.com/v2ray/v2ray-core/proxy/freedom" v2testing "github.com/v2ray/v2ray-core/testing" "github.com/v2ray/v2ray-core/testing/assert" - "github.com/v2ray/v2ray-core/transport/ray" ) -type TestDispatcher struct { - freedom *freedom.FreedomConnection -} - -func (this *TestDispatcher) DispatchToOutbound(context app.Context, dest v2net.Destination) ray.InboundRay { - direct := ray.NewRay() - - go func() { - payload, err := direct.OutboundInput().Read() - if err != nil { - direct.OutboundInput().Release() - direct.OutboundOutput().Release() - return - } - this.freedom.Dispatch(dest, payload, direct) - }() - return direct -} - func TestDnsAdd(t *testing.T) { v2testing.Current(t) - d := &TestDispatcher{ - freedom: &freedom.FreedomConnection{}, - } - spaceController := app.NewController() - spaceController.Bind(dispatcher.APP_ID, d) - space := spaceController.ForContext("test") + space := app.NewSpace() + + outboundHandlerManager := &proxyman.DefaultOutboundHandlerManager{} + outboundHandlerManager.SetDefaultHandler(&freedom.FreedomConnection{}) + space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager) + + space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space)) domain := "local.v2ray.com" server := NewCacheServer(space, &Config{ @@ -51,9 +33,7 @@ func TestDnsAdd(t *testing.T) { v2net.UDPDestination(v2net.IPAddress([]byte{8, 8, 8, 8}), v2net.Port(53)), }, }) - ips := server.Get(&apptesting.Context{ - CallerTagValue: "a", - }, domain) + ips := server.Get(domain) assert.Int(len(ips)).Equals(1) netassert.IP(ips[0].To4()).Equals(net.IP([]byte{127, 0, 0, 1})) } diff --git a/app/proxyman/proxyman.go b/app/proxyman/proxyman.go index b496cd69c..831e19acb 100644 --- a/app/proxyman/proxyman.go +++ b/app/proxyman/proxyman.go @@ -1,37 +1,59 @@ package proxyman import ( + "sync" + "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/proxy" ) const ( - APP_ID_INBOUND_MANAGER = app.ID(4) + APP_ID_INBOUND_MANAGER = app.ID(4) + APP_ID_OUTBOUND_MANAGER = app.ID(6) ) type InboundHandlerManager interface { GetHandler(tag string) (proxy.InboundHandler, int) } -type inboundHandlerManagerWithContext interface { - GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) +type OutboundHandlerManager interface { + GetHandler(tag string) proxy.OutboundHandler + GetDefaultHandler() proxy.OutboundHandler } -type inboundHandlerManagerWithContextImpl struct { - context app.Context - manager inboundHandlerManagerWithContext +type DefaultOutboundHandlerManager struct { + sync.RWMutex + defaultHandler proxy.OutboundHandler + taggedHandler map[string]proxy.OutboundHandler } -func (this *inboundHandlerManagerWithContextImpl) GetHandler(tag string) (proxy.InboundHandler, int) { - return this.manager.GetHandler(this.context, tag) +func (this *DefaultOutboundHandlerManager) GetDefaultHandler() proxy.OutboundHandler { + this.RLock() + defer this.RUnlock() + if this.defaultHandler == nil { + return nil + } + return this.defaultHandler } -func init() { - app.Register(APP_ID_INBOUND_MANAGER, func(context app.Context, obj interface{}) interface{} { - manager := obj.(inboundHandlerManagerWithContext) - return &inboundHandlerManagerWithContextImpl{ - context: context, - manager: manager, - } - }) +func (this *DefaultOutboundHandlerManager) SetDefaultHandler(handler proxy.OutboundHandler) { + this.Lock() + defer this.Unlock() + this.defaultHandler = handler +} + +func (this *DefaultOutboundHandlerManager) GetHandler(tag string) proxy.OutboundHandler { + this.RLock() + defer this.RUnlock() + if handler, found := this.taggedHandler[tag]; found { + return handler + } + return nil +} + +func (this *DefaultOutboundHandlerManager) SetHandler(tag string, handler proxy.OutboundHandler) { + this.Lock() + defer this.Unlock() + + this.taggedHandler[tag] = handler } diff --git a/app/router/router.go b/app/router/router.go index 857e69c36..b3a101749 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -5,6 +5,10 @@ import ( v2net "github.com/v2ray/v2ray-core/common/net" ) +const ( + APP_ID = app.ID(3) +) + type Router interface { TakeDetour(v2net.Destination) (string, error) } diff --git a/app/space.go b/app/space.go index 5a01bc162..43cdbb484 100644 --- a/app/space.go +++ b/app/space.go @@ -7,52 +7,26 @@ type Context interface { CallerTag() string } +type Caller interface { + Tag() string +} + // A Space contains all apps that may be available in a V2Ray runtime. // Caller must check the availability of an app by calling HasXXX before getting its instance. type Space interface { HasApp(ID) bool GetApp(ID) interface{} -} - -type ForContextCreator func(Context, interface{}) interface{} - -var ( - metadataCache = make(map[ID]ForContextCreator) -) - -func Register(id ID, creator ForContextCreator) { - // TODO: check id - metadataCache[id] = creator -} - -type contextImpl struct { - callerTag string -} - -func (this *contextImpl) CallerTag() string { - return this.callerTag + BindApp(ID, interface{}) } type spaceImpl struct { cache map[ID]interface{} - tag string } -func newSpaceImpl(tag string, cache map[ID]interface{}) *spaceImpl { - space := &spaceImpl{ - tag: tag, +func NewSpace() Space { + return &spaceImpl{ cache: make(map[ID]interface{}), } - context := &contextImpl{ - callerTag: tag, - } - for id, object := range cache { - creator, found := metadataCache[id] - if found { - space.cache[id] = creator(context, object) - } - } - return space } func (this *spaceImpl) HasApp(id ID) bool { @@ -68,22 +42,6 @@ func (this *spaceImpl) GetApp(id ID) interface{} { return obj } -// A SpaceController is supposed to be used by a shell to create Spaces. It should not be used -// directly by proxies. -type SpaceController struct { - objectCache map[ID]interface{} -} - -func NewController() *SpaceController { - return &SpaceController{ - objectCache: make(map[ID]interface{}), - } -} - -func (this *SpaceController) Bind(id ID, object interface{}) { - this.objectCache[id] = object -} - -func (this *SpaceController) ForContext(tag string) Space { - return newSpaceImpl(tag, this.objectCache) +func (this *spaceImpl) BindApp(id ID, object interface{}) { + this.cache[id] = object } diff --git a/app/testing/space.go b/app/testing/space.go deleted file mode 100644 index 288e9c2ab..000000000 --- a/app/testing/space.go +++ /dev/null @@ -1,9 +0,0 @@ -package testing - -type Context struct { - CallerTagValue string -} - -func (this *Context) CallerTag() string { - return this.CallerTagValue -} diff --git a/shell/point/point.go b/shell/point/point.go index d11411748..b66ec0589 100644 --- a/shell/point/point.go +++ b/shell/point/point.go @@ -7,6 +7,7 @@ package point import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app/dispatcher" + dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl" "github.com/v2ray/v2ray-core/app/dns" "github.com/v2ray/v2ray-core/app/proxyman" "github.com/v2ray/v2ray-core/app/router" @@ -15,7 +16,6 @@ import ( "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/proxy" proxyrepo "github.com/v2ray/v2ray-core/proxy/repo" - "github.com/v2ray/v2ray-core/transport/ray" ) // Point shell of V2Ray. @@ -27,7 +27,7 @@ type Point struct { taggedIdh map[string]InboundDetourHandler odh map[string]proxy.OutboundHandler router router.Router - space *app.SpaceController + space app.Space } // NewPoint returns a new Point server based on given configuration. @@ -55,12 +55,33 @@ func NewPoint(pConfig *Config) (*Point, error) { log.SetLogLevel(logConfig.LogLevel) } - vpoint.space = app.NewController() - vpoint.space.Bind(dispatcher.APP_ID, vpoint) - vpoint.space.Bind(proxyman.APP_ID_INBOUND_MANAGER, vpoint) + vpoint.space = app.NewSpace() + vpoint.space.BindApp(proxyman.APP_ID_INBOUND_MANAGER, vpoint) + + outboundHandlerManager := &proxyman.DefaultOutboundHandlerManager{} + vpoint.space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager) + + dnsConfig := pConfig.DNSConfig + if dnsConfig != nil { + dnsServer := dns.NewCacheServer(vpoint.space, dnsConfig) + vpoint.space.BindApp(dns.APP_ID, dnsServer) + } + + routerConfig := pConfig.RouterConfig + if routerConfig != nil { + r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings, vpoint.space) + if err != nil { + log.Error("Failed to create router: ", err) + return nil, ErrorBadConfiguration + } + vpoint.space.BindApp(router.APP_ID, r) + vpoint.router = r + } + + vpoint.space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(vpoint.space)) ichConfig := pConfig.InboundConfig.Settings - ich, err := proxyrepo.CreateInboundHandler(pConfig.InboundConfig.Protocol, vpoint.space.ForContext("vpoint-default-inbound"), ichConfig) + ich, err := proxyrepo.CreateInboundHandler(pConfig.InboundConfig.Protocol, vpoint.space, ichConfig) if err != nil { log.Error("Failed to create inbound connection handler: ", err) return nil, err @@ -68,12 +89,13 @@ func NewPoint(pConfig *Config) (*Point, error) { vpoint.ich = ich ochConfig := pConfig.OutboundConfig.Settings - och, err := proxyrepo.CreateOutboundHandler(pConfig.OutboundConfig.Protocol, vpoint.space.ForContext("vpoint-default-outbound"), ochConfig) + och, err := proxyrepo.CreateOutboundHandler(pConfig.OutboundConfig.Protocol, vpoint.space, ochConfig) if err != nil { log.Error("Failed to create outbound connection handler: ", err) return nil, err } vpoint.och = och + outboundHandlerManager.SetDefaultHandler(och) vpoint.taggedIdh = make(map[string]InboundDetourHandler) detours := pConfig.InboundDetours @@ -84,14 +106,14 @@ func NewPoint(pConfig *Config) (*Point, error) { var detourHandler InboundDetourHandler switch allocConfig.Strategy { case AllocationStrategyAlways: - dh, err := NewInboundDetourHandlerAlways(vpoint.space.ForContext(detourConfig.Tag), detourConfig) + dh, err := NewInboundDetourHandlerAlways(vpoint.space, detourConfig) if err != nil { log.Error("Point: Failed to create detour handler: ", err) return nil, ErrorBadConfiguration } detourHandler = dh case AllocationStrategyRandom: - dh, err := NewInboundDetourHandlerDynamic(vpoint.space.ForContext(detourConfig.Tag), detourConfig) + dh, err := NewInboundDetourHandlerDynamic(vpoint.space, detourConfig) if err != nil { log.Error("Point: Failed to create detour handler: ", err) return nil, ErrorBadConfiguration @@ -112,31 +134,16 @@ func NewPoint(pConfig *Config) (*Point, error) { if len(outboundDetours) > 0 { vpoint.odh = make(map[string]proxy.OutboundHandler) for _, detourConfig := range outboundDetours { - detourHandler, err := proxyrepo.CreateOutboundHandler(detourConfig.Protocol, vpoint.space.ForContext(detourConfig.Tag), detourConfig.Settings) + detourHandler, err := proxyrepo.CreateOutboundHandler(detourConfig.Protocol, vpoint.space, detourConfig.Settings) if err != nil { log.Error("Failed to create detour outbound connection handler: ", err) return nil, err } vpoint.odh[detourConfig.Tag] = detourHandler + outboundHandlerManager.SetHandler(detourConfig.Tag, detourHandler) } } - dnsConfig := pConfig.DNSConfig - if dnsConfig != nil { - dnsServer := dns.NewCacheServer(vpoint.space.ForContext("system.dns"), dnsConfig) - vpoint.space.Bind(dns.APP_ID, dnsServer) - } - - routerConfig := pConfig.RouterConfig - if routerConfig != nil { - r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings, vpoint.space.ForContext("system.router")) - if err != nil { - log.Error("Failed to create router: ", err) - return nil, ErrorBadConfiguration - } - vpoint.router = r - } - return vpoint, nil } @@ -177,40 +184,7 @@ func (this *Point) Start() error { return nil } -// Dispatches a Packet to an OutboundConnection. -// The packet will be passed through the router (if configured), and then sent to an outbound -// connection with matching tag. -func (this *Point) DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay { - direct := ray.NewRay() - dispatcher := this.och - - if this.router != nil { - if tag, err := this.router.TakeDetour(destination); err == nil { - if handler, found := this.odh[tag]; found { - log.Info("Point: Taking detour [", tag, "] for [", destination, "]") - dispatcher = handler - } else { - log.Warning("Point: Unable to find routing destination: ", tag) - } - } - } - - go this.FilterPacketAndDispatch(destination, direct, dispatcher) - return direct -} - -func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) { - payload, err := link.OutboundInput().Read() - if err != nil { - log.Info("Point: No payload to dispatch, stopping dispatching now.") - link.OutboundOutput().Release() - link.OutboundInput().Release() - return - } - dispatcher.Dispatch(destination, payload, link) -} - -func (this *Point) GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) { +func (this *Point) GetHandler(tag string) (proxy.InboundHandler, int) { handler, found := this.taggedIdh[tag] if !found { log.Warning("Point: Unable to find an inbound handler with tag: ", tag)