mirror of
https://github.com/v2fly/v2ray-core.git
synced 2024-12-12 05:06:32 -05:00
refactor app.Space
This commit is contained in:
parent
fcf8a74a3a
commit
2031c13a7f
@ -1,41 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/internal"
|
||||
)
|
||||
|
||||
// A SpaceController is supposed to be used by a shell to create Spaces. It should not be used
|
||||
// directly by proxies.
|
||||
type SpaceController struct {
|
||||
packetDispatcher internal.PacketDispatcherWithContext
|
||||
dnsCache internal.DnsCacheWithContext
|
||||
pubsub internal.PubsubWithContext
|
||||
inboundHandlerManager internal.InboundHandlerManagerWithContext
|
||||
}
|
||||
|
||||
func New() *SpaceController {
|
||||
return new(SpaceController)
|
||||
}
|
||||
|
||||
func (this *SpaceController) Bind(object interface{}) {
|
||||
if packetDispatcher, ok := object.(internal.PacketDispatcherWithContext); ok {
|
||||
this.packetDispatcher = packetDispatcher
|
||||
}
|
||||
|
||||
if dnsCache, ok := object.(internal.DnsCacheWithContext); ok {
|
||||
this.dnsCache = dnsCache
|
||||
}
|
||||
|
||||
if pubsub, ok := object.(internal.PubsubWithContext); ok {
|
||||
this.pubsub = pubsub
|
||||
}
|
||||
|
||||
if inboundHandlerManager, ok := object.(internal.InboundHandlerManagerWithContext); ok {
|
||||
this.inboundHandlerManager = inboundHandlerManager
|
||||
}
|
||||
}
|
||||
|
||||
func (this *SpaceController) ForContext(tag string) app.Space {
|
||||
return internal.NewSpace(tag, this.packetDispatcher, this.dnsCache, this.pubsub, this.inboundHandlerManager)
|
||||
}
|
39
app/dispatcher/dispatcher.go
Normal file
39
app/dispatcher/dispatcher.go
Normal file
@ -0,0 +1,39 @@
|
||||
package dispatcher
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/transport/ray"
|
||||
)
|
||||
|
||||
const (
|
||||
APP_ID = app.ID(1)
|
||||
)
|
||||
|
||||
// PacketDispatcher dispatch a packet and possibly further network payload to its destination.
|
||||
type PacketDispatcher interface {
|
||||
DispatchToOutbound(packet v2net.Packet) ray.InboundRay
|
||||
}
|
||||
|
||||
type packetDispatcherWithContext interface {
|
||||
DispatchToOutbound(context app.Context, packet v2net.Packet) ray.InboundRay
|
||||
}
|
||||
|
||||
type contextedPacketDispatcher struct {
|
||||
context app.Context
|
||||
packetDispatcher packetDispatcherWithContext
|
||||
}
|
||||
|
||||
func (this *contextedPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay {
|
||||
return this.packetDispatcher.DispatchToOutbound(this.context, packet)
|
||||
}
|
||||
|
||||
func init() {
|
||||
app.RegisterApp(APP_ID, func(context app.Context, obj interface{}) interface{} {
|
||||
packetDispatcher := obj.(packetDispatcherWithContext)
|
||||
return &contextedPacketDispatcher{
|
||||
context: context,
|
||||
packetDispatcher: packetDispatcher,
|
||||
}
|
||||
})
|
||||
}
|
11
app/dns.go
11
app/dns.go
@ -1,11 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"net"
|
||||
)
|
||||
|
||||
// A DnsCache is an internal cache of DNS resolutions.
|
||||
type DnsCache interface {
|
||||
Get(domain string) net.IP
|
||||
Add(domain string, ip net.IP)
|
||||
}
|
@ -2,61 +2,44 @@ package dns
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/common/collect"
|
||||
"github.com/v2ray/v2ray-core/common/serial"
|
||||
)
|
||||
|
||||
type entry struct {
|
||||
domain string
|
||||
ip net.IP
|
||||
validUntil time.Time
|
||||
const (
|
||||
APP_ID = app.ID(2)
|
||||
)
|
||||
|
||||
// A DnsCache is an internal cache of DNS resolutions.
|
||||
type DnsCache interface {
|
||||
Get(domain string) net.IP
|
||||
Add(domain string, ip net.IP)
|
||||
}
|
||||
|
||||
func newEntry(domain string, ip net.IP) *entry {
|
||||
this := &entry{
|
||||
domain: domain,
|
||||
ip: ip,
|
||||
}
|
||||
this.Extend()
|
||||
return this
|
||||
type dnsCacheWithContext interface {
|
||||
Get(context app.Context, domain string) net.IP
|
||||
Add(contaxt app.Context, domain string, ip net.IP)
|
||||
}
|
||||
|
||||
func (this *entry) IsValid() bool {
|
||||
return this.validUntil.After(time.Now())
|
||||
type contextedDnsCache struct {
|
||||
context app.Context
|
||||
dnsCache dnsCacheWithContext
|
||||
}
|
||||
|
||||
func (this *entry) Extend() {
|
||||
this.validUntil = time.Now().Add(time.Hour)
|
||||
func (this *contextedDnsCache) Get(domain string) net.IP {
|
||||
return this.dnsCache.Get(this.context, domain)
|
||||
}
|
||||
|
||||
type DnsCache struct {
|
||||
cache *collect.ValidityMap
|
||||
config *CacheConfig
|
||||
func (this *contextedDnsCache) Add(domain string, ip net.IP) {
|
||||
this.dnsCache.Add(this.context, domain, ip)
|
||||
}
|
||||
|
||||
func NewCache(config *CacheConfig) *DnsCache {
|
||||
cache := &DnsCache{
|
||||
cache: collect.NewValidityMap(3600),
|
||||
config: config,
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
func (this *DnsCache) Add(context app.Context, domain string, ip net.IP) {
|
||||
callerTag := context.CallerTag()
|
||||
if !this.config.IsTrustedSource(serial.StringLiteral(callerTag)) {
|
||||
return
|
||||
}
|
||||
|
||||
this.cache.Set(serial.StringLiteral(domain), newEntry(domain, ip))
|
||||
}
|
||||
|
||||
func (this *DnsCache) Get(context app.Context, domain string) net.IP {
|
||||
if value := this.cache.Get(serial.StringLiteral(domain)); value != nil {
|
||||
return value.(*entry).ip
|
||||
}
|
||||
return nil
|
||||
func init() {
|
||||
app.RegisterApp(APP_ID, func(context app.Context, obj interface{}) interface{} {
|
||||
dcContext := obj.(dnsCacheWithContext)
|
||||
return &contextedDnsCache{
|
||||
context: context,
|
||||
dnsCache: dcContext,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package dns
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/common/serial"
|
@ -1,6 +1,6 @@
|
||||
// +build json
|
||||
|
||||
package dns
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
62
app/dns/internal/dns.go
Normal file
62
app/dns/internal/dns.go
Normal file
@ -0,0 +1,62 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/common/collect"
|
||||
"github.com/v2ray/v2ray-core/common/serial"
|
||||
)
|
||||
|
||||
type entry struct {
|
||||
domain string
|
||||
ip net.IP
|
||||
validUntil time.Time
|
||||
}
|
||||
|
||||
func newEntry(domain string, ip net.IP) *entry {
|
||||
this := &entry{
|
||||
domain: domain,
|
||||
ip: ip,
|
||||
}
|
||||
this.Extend()
|
||||
return this
|
||||
}
|
||||
|
||||
func (this *entry) IsValid() bool {
|
||||
return this.validUntil.After(time.Now())
|
||||
}
|
||||
|
||||
func (this *entry) Extend() {
|
||||
this.validUntil = time.Now().Add(time.Hour)
|
||||
}
|
||||
|
||||
type DnsCache struct {
|
||||
cache *collect.ValidityMap
|
||||
config *CacheConfig
|
||||
}
|
||||
|
||||
func NewCache(config *CacheConfig) *DnsCache {
|
||||
cache := &DnsCache{
|
||||
cache: collect.NewValidityMap(3600),
|
||||
config: config,
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
func (this *DnsCache) Add(context app.Context, domain string, ip net.IP) {
|
||||
callerTag := context.CallerTag()
|
||||
if !this.config.IsTrustedSource(serial.StringLiteral(callerTag)) {
|
||||
return
|
||||
}
|
||||
|
||||
this.cache.Set(serial.StringLiteral(domain), newEntry(domain, ip))
|
||||
}
|
||||
|
||||
func (this *DnsCache) Get(context app.Context, domain string) net.IP {
|
||||
if value := this.cache.Get(serial.StringLiteral(domain)); value != nil {
|
||||
return value.(*entry).ip
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
package dns_test
|
||||
package internal_test
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app/dns"
|
||||
. "github.com/v2ray/v2ray-core/app/dns/internal"
|
||||
apptesting "github.com/v2ray/v2ray-core/app/testing"
|
||||
netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
|
||||
"github.com/v2ray/v2ray-core/common/serial"
|
||||
@ -15,7 +15,7 @@ func TestDnsAdd(t *testing.T) {
|
||||
v2testing.Current(t)
|
||||
|
||||
domain := "v2ray.com"
|
||||
cache := dns.NewCache(&dns.CacheConfig{
|
||||
cache := NewCache(&CacheConfig{
|
||||
TrustedTags: map[serial.StringLiteral]bool{
|
||||
serial.StringLiteral("testtag"): true,
|
||||
},
|
@ -1,9 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
)
|
||||
|
||||
type InboundHandlerManager interface {
|
||||
GetHandler(tag string) (proxy.InboundHandler, int)
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
package internal
|
||||
|
||||
type contextImpl struct {
|
||||
callerTag string
|
||||
}
|
||||
|
||||
func (this *contextImpl) CallerTag() string {
|
||||
return this.callerTag
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
)
|
||||
|
||||
type DnsCacheWithContext interface {
|
||||
Get(context app.Context, domain string) net.IP
|
||||
Add(contaxt app.Context, domain string, ip net.IP)
|
||||
}
|
||||
|
||||
type contextedDnsCache struct {
|
||||
context app.Context
|
||||
dnsCache DnsCacheWithContext
|
||||
}
|
||||
|
||||
func (this *contextedDnsCache) Get(domain string) net.IP {
|
||||
return this.dnsCache.Get(this.context, domain)
|
||||
}
|
||||
|
||||
func (this *contextedDnsCache) Add(domain string, ip net.IP) {
|
||||
this.dnsCache.Add(this.context, domain, ip)
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
)
|
||||
|
||||
type InboundHandlerManagerWithContext interface {
|
||||
GetHandler(context app.Context, tag string) (proxy.InboundHandler, int)
|
||||
}
|
||||
|
||||
type inboundHandlerManagerWithContext struct {
|
||||
context app.Context
|
||||
manager InboundHandlerManagerWithContext
|
||||
}
|
||||
|
||||
func (this *inboundHandlerManagerWithContext) GetHandler(tag string) (proxy.InboundHandler, int) {
|
||||
return this.manager.GetHandler(this.context, tag)
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/transport/ray"
|
||||
)
|
||||
|
||||
type PacketDispatcherWithContext interface {
|
||||
DispatchToOutbound(context app.Context, packet v2net.Packet) ray.InboundRay
|
||||
}
|
||||
|
||||
type contextedPacketDispatcher struct {
|
||||
context app.Context
|
||||
packetDispatcher PacketDispatcherWithContext
|
||||
}
|
||||
|
||||
func (this *contextedPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay {
|
||||
return this.packetDispatcher.DispatchToOutbound(this.context, packet)
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
)
|
||||
|
||||
type PubsubWithContext interface {
|
||||
Publish(context app.Context, topic string, message app.PubsubMessage)
|
||||
Subscribe(context app.Context, topic string, handler app.TopicHandler)
|
||||
}
|
||||
|
||||
type contextedPubsub struct {
|
||||
context app.Context
|
||||
pubsub PubsubWithContext
|
||||
}
|
||||
|
||||
func (this *contextedPubsub) Publish(topic string, message app.PubsubMessage) {
|
||||
this.pubsub.Publish(this.context, topic, message)
|
||||
}
|
||||
|
||||
func (this *contextedPubsub) Subscribe(topic string, handler app.TopicHandler) {
|
||||
this.pubsub.Subscribe(this.context, topic, handler)
|
||||
}
|
@ -1,75 +0,0 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
)
|
||||
|
||||
type Space struct {
|
||||
packetDispatcher PacketDispatcherWithContext
|
||||
dnsCache DnsCacheWithContext
|
||||
pubsub PubsubWithContext
|
||||
inboundHandlerManager InboundHandlerManagerWithContext
|
||||
tag string
|
||||
}
|
||||
|
||||
func NewSpace(tag string, packetDispatcher PacketDispatcherWithContext, dnsCache DnsCacheWithContext, pubsub PubsubWithContext, inboundHandlerManager InboundHandlerManagerWithContext) *Space {
|
||||
return &Space{
|
||||
tag: tag,
|
||||
packetDispatcher: packetDispatcher,
|
||||
dnsCache: dnsCache,
|
||||
pubsub: pubsub,
|
||||
inboundHandlerManager: inboundHandlerManager,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Space) HasPacketDispatcher() bool {
|
||||
return this.packetDispatcher != nil
|
||||
}
|
||||
|
||||
func (this *Space) PacketDispatcher() app.PacketDispatcher {
|
||||
return &contextedPacketDispatcher{
|
||||
packetDispatcher: this.packetDispatcher,
|
||||
context: &contextImpl{
|
||||
callerTag: this.tag,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Space) HasDnsCache() bool {
|
||||
return this.dnsCache != nil
|
||||
}
|
||||
|
||||
func (this *Space) DnsCache() app.DnsCache {
|
||||
return &contextedDnsCache{
|
||||
dnsCache: this.dnsCache,
|
||||
context: &contextImpl{
|
||||
callerTag: this.tag,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Space) HasPubsub() bool {
|
||||
return this.pubsub != nil
|
||||
}
|
||||
|
||||
func (this *Space) Pubsub() app.Pubsub {
|
||||
return &contextedPubsub{
|
||||
pubsub: this.pubsub,
|
||||
context: &contextImpl{
|
||||
callerTag: this.tag,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Space) HasInboundHandlerManager() bool {
|
||||
return this.inboundHandlerManager != nil
|
||||
}
|
||||
|
||||
func (this *Space) InboundHandlerManager() app.InboundHandlerManager {
|
||||
return &inboundHandlerManagerWithContext{
|
||||
manager: this.inboundHandlerManager,
|
||||
context: &contextImpl{
|
||||
callerTag: this.tag,
|
||||
},
|
||||
}
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/transport/ray"
|
||||
)
|
||||
|
||||
// PacketDispatcher dispatch a packet and possibly further network payload to its destination.
|
||||
type PacketDispatcher interface {
|
||||
DispatchToOutbound(packet v2net.Packet) ray.InboundRay
|
||||
}
|
37
app/proxyman/proxyman.go
Normal file
37
app/proxyman/proxyman.go
Normal file
@ -0,0 +1,37 @@
|
||||
package proxyman
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
)
|
||||
|
||||
const (
|
||||
APP_ID_INBOUND_MANAGER = app.ID(4)
|
||||
)
|
||||
|
||||
type InboundHandlerManager interface {
|
||||
GetHandler(tag string) (proxy.InboundHandler, int)
|
||||
}
|
||||
|
||||
type inboundHandlerManagerWithContext interface {
|
||||
GetHandler(context app.Context, tag string) (proxy.InboundHandler, int)
|
||||
}
|
||||
|
||||
type inboundHandlerManagerWithContextImpl struct {
|
||||
context app.Context
|
||||
manager inboundHandlerManagerWithContext
|
||||
}
|
||||
|
||||
func (this *inboundHandlerManagerWithContextImpl) GetHandler(tag string) (proxy.InboundHandler, int) {
|
||||
return this.manager.GetHandler(this.context, tag)
|
||||
}
|
||||
|
||||
func init() {
|
||||
app.RegisterApp(APP_ID_INBOUND_MANAGER, func(context app.Context, obj interface{}) interface{} {
|
||||
manager := obj.(inboundHandlerManagerWithContext)
|
||||
return &inboundHandlerManagerWithContextImpl{
|
||||
context: context,
|
||||
manager: manager,
|
||||
}
|
||||
})
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
package app
|
||||
|
||||
type PubsubMessage []byte
|
||||
type TopicHandler func(PubsubMessage)
|
||||
|
||||
type Pubsub interface {
|
||||
Publish(topic string, message PubsubMessage)
|
||||
Subscribe(topic string, handler TopicHandler)
|
||||
}
|
64
app/pubsub/internal/pubsub.go
Normal file
64
app/pubsub/internal/pubsub.go
Normal file
@ -0,0 +1,64 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/pubsub"
|
||||
)
|
||||
|
||||
type TopicHandlerList struct {
|
||||
sync.RWMutex
|
||||
handlers []pubsub.TopicHandler
|
||||
}
|
||||
|
||||
func NewTopicHandlerList(handlers ...pubsub.TopicHandler) *TopicHandlerList {
|
||||
return &TopicHandlerList{
|
||||
handlers: handlers,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *TopicHandlerList) Add(handler pubsub.TopicHandler) {
|
||||
this.Lock()
|
||||
this.handlers = append(this.handlers, handler)
|
||||
this.Unlock()
|
||||
}
|
||||
|
||||
func (this *TopicHandlerList) Dispatch(message pubsub.PubsubMessage) {
|
||||
this.RLock()
|
||||
for _, handler := range this.handlers {
|
||||
go handler(message)
|
||||
}
|
||||
this.RUnlock()
|
||||
}
|
||||
|
||||
type Pubsub struct {
|
||||
topics map[string]*TopicHandlerList
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func New() *Pubsub {
|
||||
return &Pubsub{
|
||||
topics: make(map[string]*TopicHandlerList),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Pubsub) Publish(context app.Context, topic string, message pubsub.PubsubMessage) {
|
||||
this.RLock()
|
||||
list, found := this.topics[topic]
|
||||
this.RUnlock()
|
||||
|
||||
if found {
|
||||
list.Dispatch(message)
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Pubsub) Subscribe(context app.Context, topic string, handler pubsub.TopicHandler) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
if list, found := this.topics[topic]; found {
|
||||
list.Add(handler)
|
||||
} else {
|
||||
this.topics[topic] = NewTopicHandlerList(handler)
|
||||
}
|
||||
}
|
@ -1,11 +1,11 @@
|
||||
package pubsub_test
|
||||
package internal_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
. "github.com/v2ray/v2ray-core/app/pubsub"
|
||||
"github.com/v2ray/v2ray-core/app/pubsub"
|
||||
. "github.com/v2ray/v2ray-core/app/pubsub/internal"
|
||||
apptesting "github.com/v2ray/v2ray-core/app/testing"
|
||||
v2testing "github.com/v2ray/v2ray-core/testing"
|
||||
"github.com/v2ray/v2ray-core/testing/assert"
|
||||
@ -14,19 +14,19 @@ import (
|
||||
func TestPubsub(t *testing.T) {
|
||||
v2testing.Current(t)
|
||||
|
||||
messages := make(map[string]app.PubsubMessage)
|
||||
messages := make(map[string]pubsub.PubsubMessage)
|
||||
|
||||
pubsub := New()
|
||||
pubsub.Subscribe(&apptesting.Context{}, "t1", func(message app.PubsubMessage) {
|
||||
ps := New()
|
||||
ps.Subscribe(&apptesting.Context{}, "t1", func(message pubsub.PubsubMessage) {
|
||||
messages["t1"] = message
|
||||
})
|
||||
|
||||
pubsub.Subscribe(&apptesting.Context{}, "t2", func(message app.PubsubMessage) {
|
||||
ps.Subscribe(&apptesting.Context{}, "t2", func(message pubsub.PubsubMessage) {
|
||||
messages["t2"] = message
|
||||
})
|
||||
|
||||
message := app.PubsubMessage([]byte("This is a pubsub message."))
|
||||
pubsub.Publish(&apptesting.Context{}, "t2", message)
|
||||
message := pubsub.PubsubMessage([]byte("This is a pubsub message."))
|
||||
ps.Publish(&apptesting.Context{}, "t2", message)
|
||||
<-time.Tick(time.Second)
|
||||
|
||||
_, found := messages["t1"]
|
@ -1,64 +1,45 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/internal"
|
||||
)
|
||||
|
||||
type TopicHandlerList struct {
|
||||
sync.RWMutex
|
||||
handlers []app.TopicHandler
|
||||
const (
|
||||
APP_ID = app.ID(3)
|
||||
)
|
||||
|
||||
type PubsubMessage []byte
|
||||
type TopicHandler func(PubsubMessage)
|
||||
|
||||
type Pubsub interface {
|
||||
Publish(topic string, message PubsubMessage)
|
||||
Subscribe(topic string, handler TopicHandler)
|
||||
}
|
||||
|
||||
func NewTopicHandlerList(handlers ...app.TopicHandler) *TopicHandlerList {
|
||||
return &TopicHandlerList{
|
||||
handlers: handlers,
|
||||
}
|
||||
type pubsubWithContext interface {
|
||||
Publish(context app.Context, topic string, message PubsubMessage)
|
||||
Subscribe(context app.Context, topic string, handler TopicHandler)
|
||||
}
|
||||
|
||||
func (this *TopicHandlerList) Add(handler app.TopicHandler) {
|
||||
this.Lock()
|
||||
this.handlers = append(this.handlers, handler)
|
||||
this.Unlock()
|
||||
type contextedPubsub struct {
|
||||
context app.Context
|
||||
pubsub pubsubWithContext
|
||||
}
|
||||
|
||||
func (this *TopicHandlerList) Dispatch(message app.PubsubMessage) {
|
||||
this.RLock()
|
||||
for _, handler := range this.handlers {
|
||||
go handler(message)
|
||||
}
|
||||
this.RUnlock()
|
||||
func (this *contextedPubsub) Publish(topic string, message PubsubMessage) {
|
||||
this.pubsub.Publish(this.context, topic, message)
|
||||
}
|
||||
|
||||
type Pubsub struct {
|
||||
topics map[string]*TopicHandlerList
|
||||
sync.RWMutex
|
||||
func (this *contextedPubsub) Subscribe(topic string, handler TopicHandler) {
|
||||
this.pubsub.Subscribe(this.context, topic, handler)
|
||||
}
|
||||
|
||||
func New() internal.PubsubWithContext {
|
||||
return &Pubsub{
|
||||
topics: make(map[string]*TopicHandlerList),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Pubsub) Publish(context app.Context, topic string, message app.PubsubMessage) {
|
||||
this.RLock()
|
||||
list, found := this.topics[topic]
|
||||
this.RUnlock()
|
||||
|
||||
if found {
|
||||
list.Dispatch(message)
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Pubsub) Subscribe(context app.Context, topic string, handler app.TopicHandler) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
if list, found := this.topics[topic]; found {
|
||||
list.Add(handler)
|
||||
} else {
|
||||
this.topics[topic] = NewTopicHandlerList(handler)
|
||||
}
|
||||
func init() {
|
||||
app.RegisterApp(APP_ID, func(context app.Context, obj interface{}) interface{} {
|
||||
pubsub := obj.(pubsubWithContext)
|
||||
return &contextedPubsub{
|
||||
context: context,
|
||||
pubsub: pubsub,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
96
app/space.go
96
app/space.go
@ -1,5 +1,14 @@
|
||||
package app
|
||||
|
||||
type ID int
|
||||
|
||||
const (
|
||||
PACKET_DISPATCHER = ID(1)
|
||||
DNS_CACHE = ID(2)
|
||||
PUBSUB = ID(3)
|
||||
INBOUND_HANDLER_MANAGER = ID(4)
|
||||
)
|
||||
|
||||
// Context of a function call from proxy to app.
|
||||
type Context interface {
|
||||
CallerTag() string
|
||||
@ -8,15 +17,80 @@ type Context interface {
|
||||
// A Space contains all apps that may be available in a V2Ray runtime.
|
||||
// Caller must check the availability of an app by calling HasXXX before getting its instance.
|
||||
type Space interface {
|
||||
HasPacketDispatcher() bool
|
||||
PacketDispatcher() PacketDispatcher
|
||||
|
||||
HasDnsCache() bool
|
||||
DnsCache() DnsCache
|
||||
|
||||
HasPubsub() bool
|
||||
Pubsub() Pubsub
|
||||
|
||||
HasInboundHandlerManager() bool
|
||||
InboundHandlerManager() InboundHandlerManager
|
||||
HasApp(ID) bool
|
||||
GetApp(ID) interface{}
|
||||
}
|
||||
|
||||
type ForContextCreator func(Context, interface{}) interface{}
|
||||
|
||||
var (
|
||||
metadataCache = make(map[ID]ForContextCreator)
|
||||
)
|
||||
|
||||
func RegisterApp(id ID, creator ForContextCreator) {
|
||||
// TODO: check id
|
||||
metadataCache[id] = creator
|
||||
}
|
||||
|
||||
type contextImpl struct {
|
||||
callerTag string
|
||||
}
|
||||
|
||||
func (this *contextImpl) CallerTag() string {
|
||||
return this.callerTag
|
||||
}
|
||||
|
||||
type spaceImpl struct {
|
||||
cache map[ID]interface{}
|
||||
tag string
|
||||
}
|
||||
|
||||
func newSpaceImpl(tag string, cache map[ID]interface{}) *spaceImpl {
|
||||
space := &spaceImpl{
|
||||
tag: tag,
|
||||
cache: make(map[ID]interface{}),
|
||||
}
|
||||
context := &contextImpl{
|
||||
callerTag: tag,
|
||||
}
|
||||
for id, object := range cache {
|
||||
creator, found := metadataCache[id]
|
||||
if found {
|
||||
space.cache[id] = creator(context, object)
|
||||
}
|
||||
}
|
||||
return space
|
||||
}
|
||||
|
||||
func (this *spaceImpl) HasApp(id ID) bool {
|
||||
_, found := this.cache[id]
|
||||
return found
|
||||
}
|
||||
|
||||
func (this *spaceImpl) GetApp(id ID) interface{} {
|
||||
obj, found := this.cache[id]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
||||
// A SpaceController is supposed to be used by a shell to create Spaces. It should not be used
|
||||
// directly by proxies.
|
||||
type SpaceController struct {
|
||||
objectCache map[ID]interface{}
|
||||
}
|
||||
|
||||
func NewController() *SpaceController {
|
||||
return &SpaceController{
|
||||
objectCache: make(map[ID]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *SpaceController) Bind(id ID, object interface{}) {
|
||||
this.objectCache[id] = object
|
||||
}
|
||||
|
||||
func (this *SpaceController) ForContext(tag string) Space {
|
||||
return newSpaceImpl(tag, this.objectCache)
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
@ -14,24 +14,24 @@ import (
|
||||
)
|
||||
|
||||
type DokodemoDoor struct {
|
||||
tcpMutex sync.RWMutex
|
||||
udpMutex sync.RWMutex
|
||||
config *Config
|
||||
accepting bool
|
||||
address v2net.Address
|
||||
port v2net.Port
|
||||
space app.Space
|
||||
tcpListener *hub.TCPHub
|
||||
udpHub *hub.UDPHub
|
||||
listeningPort v2net.Port
|
||||
tcpMutex sync.RWMutex
|
||||
udpMutex sync.RWMutex
|
||||
config *Config
|
||||
accepting bool
|
||||
address v2net.Address
|
||||
port v2net.Port
|
||||
packetDispatcher dispatcher.PacketDispatcher
|
||||
tcpListener *hub.TCPHub
|
||||
udpHub *hub.UDPHub
|
||||
listeningPort v2net.Port
|
||||
}
|
||||
|
||||
func NewDokodemoDoor(space app.Space, config *Config) *DokodemoDoor {
|
||||
func NewDokodemoDoor(config *Config, packetDispatcher dispatcher.PacketDispatcher) *DokodemoDoor {
|
||||
return &DokodemoDoor{
|
||||
config: config,
|
||||
space: space,
|
||||
address: config.Address,
|
||||
port: config.Port,
|
||||
config: config,
|
||||
packetDispatcher: packetDispatcher,
|
||||
address: config.Address,
|
||||
port: config.Port,
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,7 +95,7 @@ func (this *DokodemoDoor) ListenUDP(port v2net.Port) error {
|
||||
|
||||
func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) {
|
||||
packet := v2net.NewPacket(v2net.UDPDestination(this.address, this.port), payload, false)
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(packet)
|
||||
close(ray.InboundInput())
|
||||
|
||||
for resp := range ray.InboundOutput() {
|
||||
@ -127,7 +127,7 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) {
|
||||
defer conn.Close()
|
||||
|
||||
packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true)
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(packet)
|
||||
|
||||
var inputFinish, outputFinish sync.Mutex
|
||||
inputFinish.Lock()
|
||||
|
@ -2,6 +2,7 @@ package dokodemo
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
"github.com/v2ray/v2ray-core/proxy/internal"
|
||||
)
|
||||
@ -10,6 +11,11 @@ func init() {
|
||||
internal.MustRegisterInboundHandlerCreator("dokodemo-door",
|
||||
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
|
||||
config := rawConfig.(*Config)
|
||||
return NewDokodemoDoor(space, config), nil
|
||||
if !space.HasApp(dispatcher.APP_ID) {
|
||||
return nil, internal.ErrorBadConfiguration
|
||||
}
|
||||
return NewDokodemoDoor(
|
||||
config,
|
||||
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
|
||||
})
|
||||
}
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
@ -14,7 +13,6 @@ import (
|
||||
)
|
||||
|
||||
type FreedomConnection struct {
|
||||
space app.Space
|
||||
}
|
||||
|
||||
func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error {
|
||||
@ -77,19 +75,6 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
|
||||
v2io.RawReaderToChan(output, conn)
|
||||
}()
|
||||
|
||||
if this.space.HasDnsCache() {
|
||||
if firstPacket.Destination().Address().IsDomain() {
|
||||
domain := firstPacket.Destination().Address().Domain()
|
||||
addr := conn.RemoteAddr()
|
||||
switch typedAddr := addr.(type) {
|
||||
case *net.TCPAddr:
|
||||
this.space.DnsCache().Add(domain, typedAddr.IP)
|
||||
case *net.UDPAddr:
|
||||
this.space.DnsCache().Add(domain, typedAddr.IP)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writeMutex.Lock()
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
tcpConn.CloseWrite()
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"golang.org/x/net/proxy"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
@ -50,7 +51,7 @@ func TestUDPSend(t *testing.T) {
|
||||
|
||||
protocol, err := proxytesting.RegisterInboundConnectionHandlerCreator("mock_ich",
|
||||
func(space app.Space, config interface{}) (v2proxy.InboundHandler, error) {
|
||||
ich.Space = space
|
||||
ich.PacketDispatcher = space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)
|
||||
return ich, nil
|
||||
})
|
||||
assert.Error(err).IsNil()
|
||||
|
@ -9,6 +9,6 @@ import (
|
||||
func init() {
|
||||
internal.MustRegisterOutboundHandlerCreator("freedom",
|
||||
func(space app.Space, config interface{}) (proxy.OutboundHandler, error) {
|
||||
return &FreedomConnection{space: space}, nil
|
||||
return &FreedomConnection{}, nil
|
||||
})
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
@ -22,17 +22,17 @@ import (
|
||||
|
||||
type HttpProxyServer struct {
|
||||
sync.Mutex
|
||||
accepting bool
|
||||
space app.Space
|
||||
config *Config
|
||||
tcpListener *hub.TCPHub
|
||||
listeningPort v2net.Port
|
||||
accepting bool
|
||||
packetDispatcher dispatcher.PacketDispatcher
|
||||
config *Config
|
||||
tcpListener *hub.TCPHub
|
||||
listeningPort v2net.Port
|
||||
}
|
||||
|
||||
func NewHttpProxyServer(space app.Space, config *Config) *HttpProxyServer {
|
||||
func NewHttpProxyServer(config *Config, packetDispatcher dispatcher.PacketDispatcher) *HttpProxyServer {
|
||||
return &HttpProxyServer{
|
||||
space: space,
|
||||
config: config,
|
||||
packetDispatcher: packetDispatcher,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@ -144,7 +144,7 @@ func (this *HttpProxyServer) handleConnect(request *http.Request, destination v2
|
||||
buffer.Release()
|
||||
|
||||
packet := v2net.NewPacket(destination, nil, true)
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(packet)
|
||||
this.transport(reader, writer, ray)
|
||||
}
|
||||
|
||||
@ -220,7 +220,7 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D
|
||||
log.Debug("Request to remote:\n", serial.BytesLiteral(requestBuffer.Value))
|
||||
|
||||
packet := v2net.NewPacket(dest, requestBuffer, true)
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(packet)
|
||||
defer close(ray.InboundInput())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
@ -2,6 +2,7 @@ package http
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
"github.com/v2ray/v2ray-core/proxy/internal"
|
||||
)
|
||||
@ -9,6 +10,11 @@ import (
|
||||
func init() {
|
||||
internal.MustRegisterInboundHandlerCreator("http",
|
||||
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
|
||||
return NewHttpProxyServer(space, rawConfig.(*Config)), nil
|
||||
if !space.HasApp(dispatcher.APP_ID) {
|
||||
return nil, internal.ErrorBadConfiguration
|
||||
}
|
||||
return NewHttpProxyServer(
|
||||
rawConfig.(*Config),
|
||||
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
|
||||
})
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
@ -19,12 +20,19 @@ import (
|
||||
)
|
||||
|
||||
type Shadowsocks struct {
|
||||
space app.Space
|
||||
config *Config
|
||||
port v2net.Port
|
||||
accepting bool
|
||||
tcpHub *hub.TCPHub
|
||||
udpHub *hub.UDPHub
|
||||
packetDispatcher dispatcher.PacketDispatcher
|
||||
config *Config
|
||||
port v2net.Port
|
||||
accepting bool
|
||||
tcpHub *hub.TCPHub
|
||||
udpHub *hub.UDPHub
|
||||
}
|
||||
|
||||
func NewShadowsocks(config *Config, packetDispatcher dispatcher.PacketDispatcher) *Shadowsocks {
|
||||
return &Shadowsocks{
|
||||
config: config,
|
||||
packetDispatcher: packetDispatcher,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Shadowsocks) Port() v2net.Port {
|
||||
@ -99,7 +107,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D
|
||||
log.Info("Shadowsocks: Tunnelling request to ", dest)
|
||||
|
||||
packet := v2net.NewPacket(dest, request.UDPPayload, false)
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(packet)
|
||||
close(ray.InboundInput())
|
||||
|
||||
for respChunk := range ray.InboundOutput() {
|
||||
@ -174,7 +182,7 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
|
||||
log.Info("Shadowsocks: Tunnelling request to ", dest)
|
||||
|
||||
packet := v2net.NewPacket(dest, nil, true)
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(packet)
|
||||
|
||||
var writeFinish sync.Mutex
|
||||
writeFinish.Lock()
|
||||
@ -215,10 +223,11 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
|
||||
func init() {
|
||||
internal.MustRegisterInboundHandlerCreator("shadowsocks",
|
||||
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
|
||||
config := rawConfig.(*Config)
|
||||
return &Shadowsocks{
|
||||
space: space,
|
||||
config: config,
|
||||
}, nil
|
||||
if !space.HasApp(dispatcher.APP_ID) {
|
||||
return nil, internal.ErrorBadConfiguration
|
||||
}
|
||||
return NewShadowsocks(
|
||||
rawConfig.(*Config),
|
||||
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
|
||||
})
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
@ -24,21 +24,21 @@ var (
|
||||
|
||||
// SocksServer is a SOCKS 5 proxy server
|
||||
type SocksServer struct {
|
||||
tcpMutex sync.RWMutex
|
||||
udpMutex sync.RWMutex
|
||||
accepting bool
|
||||
space app.Space
|
||||
config *Config
|
||||
tcpListener *hub.TCPHub
|
||||
udpConn *net.UDPConn
|
||||
udpAddress v2net.Destination
|
||||
listeningPort v2net.Port
|
||||
tcpMutex sync.RWMutex
|
||||
udpMutex sync.RWMutex
|
||||
accepting bool
|
||||
packetDispatcher dispatcher.PacketDispatcher
|
||||
config *Config
|
||||
tcpListener *hub.TCPHub
|
||||
udpConn *net.UDPConn
|
||||
udpAddress v2net.Destination
|
||||
listeningPort v2net.Port
|
||||
}
|
||||
|
||||
func NewSocksServer(space app.Space, config *Config) *SocksServer {
|
||||
func NewSocksServer(config *Config, packetDispatcher dispatcher.PacketDispatcher) *SocksServer {
|
||||
return &SocksServer{
|
||||
space: space,
|
||||
config: config,
|
||||
config: config,
|
||||
packetDispatcher: packetDispatcher,
|
||||
}
|
||||
}
|
||||
|
||||
@ -262,7 +262,7 @@ func (this *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth p
|
||||
}
|
||||
|
||||
func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPacket v2net.Packet) {
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(firstPacket)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(firstPacket)
|
||||
input := ray.InboundInput()
|
||||
output := ray.InboundOutput()
|
||||
|
||||
|
@ -2,6 +2,7 @@ package socks
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
"github.com/v2ray/v2ray-core/proxy/internal"
|
||||
)
|
||||
@ -9,6 +10,11 @@ import (
|
||||
func init() {
|
||||
internal.MustRegisterInboundHandlerCreator("socks",
|
||||
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
|
||||
return NewSocksServer(space, rawConfig.(*Config)), nil
|
||||
if !space.HasApp(dispatcher.APP_ID) {
|
||||
return nil, internal.ErrorBadConfiguration
|
||||
}
|
||||
return NewSocksServer(
|
||||
rawConfig.(*Config),
|
||||
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
|
||||
})
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func (this *SocksServer) AcceptPackets() error {
|
||||
}
|
||||
|
||||
func (this *SocksServer) handlePacket(packet v2net.Packet, clientAddr *net.UDPAddr, targetAddr v2net.Address, port v2net.Port) {
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.packetDispatcher.DispatchToOutbound(packet)
|
||||
close(ray.InboundInput())
|
||||
|
||||
for data := range ray.InboundOutput() {
|
||||
|
@ -4,16 +4,16 @@ import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
)
|
||||
|
||||
type InboundConnectionHandler struct {
|
||||
port v2net.Port
|
||||
Space app.Space
|
||||
ConnInput io.Reader
|
||||
ConnOutput io.Writer
|
||||
port v2net.Port
|
||||
PacketDispatcher dispatcher.PacketDispatcher
|
||||
ConnInput io.Reader
|
||||
ConnOutput io.Writer
|
||||
}
|
||||
|
||||
func (this *InboundConnectionHandler) Listen(port v2net.Port) error {
|
||||
@ -30,7 +30,7 @@ func (this *InboundConnectionHandler) Close() {
|
||||
}
|
||||
|
||||
func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error {
|
||||
ray := this.Space.PacketDispatcher().DispatchToOutbound(packet)
|
||||
ray := this.PacketDispatcher.DispatchToOutbound(packet)
|
||||
|
||||
input := ray.InboundInput()
|
||||
output := ray.InboundOutput()
|
||||
|
@ -17,9 +17,8 @@ func (this *VMessInboundHandler) generateCommand(buffer *alloc.Buffer) {
|
||||
if this.features != nil && this.features.Detour != nil {
|
||||
|
||||
tag := this.features.Detour.ToTag
|
||||
if this.space.HasInboundHandlerManager() {
|
||||
handlerManager := this.space.InboundHandlerManager()
|
||||
handler, availableMin := handlerManager.GetHandler(tag)
|
||||
if this.inboundHandlerManager != nil {
|
||||
handler, availableMin := this.inboundHandlerManager.GetHandler(tag)
|
||||
inboundHandler, ok := handler.(*VMessInboundHandler)
|
||||
if ok {
|
||||
if availableMin > 255 {
|
||||
|
@ -6,6 +6,8 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/app/proxyman"
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
v2crypto "github.com/v2ray/v2ray-core/common/crypto"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
@ -22,13 +24,14 @@ import (
|
||||
// Inbound connection handler that handles messages in VMess format.
|
||||
type VMessInboundHandler struct {
|
||||
sync.Mutex
|
||||
space app.Space
|
||||
clients protocol.UserSet
|
||||
user *vmess.User
|
||||
accepting bool
|
||||
listener *hub.TCPHub
|
||||
features *FeaturesConfig
|
||||
listeningPort v2net.Port
|
||||
packetDispatcher dispatcher.PacketDispatcher
|
||||
inboundHandlerManager proxyman.InboundHandlerManager
|
||||
clients protocol.UserSet
|
||||
user *vmess.User
|
||||
accepting bool
|
||||
listener *hub.TCPHub
|
||||
features *FeaturesConfig
|
||||
listeningPort v2net.Port
|
||||
}
|
||||
|
||||
func (this *VMessInboundHandler) Port() v2net.Port {
|
||||
@ -86,7 +89,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
|
||||
log.Access(connection.RemoteAddr(), request.Address, log.AccessAccepted, serial.StringLiteral(""))
|
||||
log.Debug("VMessIn: Received request for ", request.Address)
|
||||
|
||||
ray := this.space.PacketDispatcher().DispatchToOutbound(v2net.NewPacket(request.Destination(), nil, true))
|
||||
ray := this.packetDispatcher.DispatchToOutbound(v2net.NewPacket(< |