diff --git a/app/reverse/bridge.go b/app/reverse/bridge.go new file mode 100644 index 000000000..077a3ff86 --- /dev/null +++ b/app/reverse/bridge.go @@ -0,0 +1,172 @@ +package reverse + +import ( + "context" + "time" + + "github.com/golang/protobuf/proto" + "v2ray.com/core/common/mux" + "v2ray.com/core/common/net" + "v2ray.com/core/common/session" + "v2ray.com/core/common/task" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/routing" + "v2ray.com/core/transport/pipe" +) + +type Bridge struct { + dispatcher routing.Dispatcher + tag string + domain string + workers []*BridgeWorker + monitorTask *task.Periodic +} + +func NewBridge(config *BridgeConfig, dispatcher routing.Dispatcher) (*Bridge, error) { + if len(config.Tag) == 0 { + return nil, newError("bridge tag is empty") + } + if len(config.Domain) == 0 { + return nil, newError("bridge domain is empty") + } + + b := &Bridge{ + dispatcher: dispatcher, + } + b.monitorTask = &task.Periodic{ + Execute: b.monitor, + Interval: time.Second * 2, + } + return b, nil +} + +func (b *Bridge) monitor() error { + var numConnections uint32 + var numWorker uint32 + + for _, w := range b.workers { + if w.IsActive() { + numConnections += w.Connections() + numWorker++ + } + } + + if numWorker == 0 || numConnections/numWorker > 16 { + worker, err := NewBridgeWorker(b.domain, b.tag, b.dispatcher) + if err != nil { + newError("failed to create bridge worker").Base(err).AtWarning().WriteToLog() + return nil + } + b.workers = append(b.workers, worker) + } + + return nil +} + +func (b *Bridge) Start() error { + return b.monitorTask.Start() +} + +func (b *Bridge) Close() error { + return b.monitorTask.Close() +} + +type BridgeWorker struct { + tag string + worker *mux.ServerWorker + dispatcher routing.Dispatcher + state Control_State +} + +func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) { + ctx := context.Background() + ctx = session.ContextWithInbound(ctx, &session.Inbound{ + Tag: tag, + }) + link, err := d.Dispatch(ctx, net.Destination{ + Network: net.Network_TCP, + Address: net.DomainAddress(domain), + Port: 0, + }) + if err != nil { + return nil, err + } + + w := &BridgeWorker{ + dispatcher: d, + tag: tag, + } + + worker, err := mux.NewServerWorker(context.Background(), w, link) + if err != nil { + return nil, err + } + w.worker = worker + + return w, nil +} + +func (w *BridgeWorker) Type() interface{} { + return routing.DispatcherType() +} + +func (w *BridgeWorker) Start() error { + return nil +} + +func (w *BridgeWorker) Close() error { + return nil +} + +func (w *BridgeWorker) IsActive() bool { + return w.state == Control_ACTIVE +} + +func (w *BridgeWorker) Connections() uint32 { + return w.worker.ActiveConnections() +} + +func (w *BridgeWorker) handleInternalConn(link vio.Link) { + go func() { + reader := link.Reader + for { + mb, err := reader.ReadMultiBuffer() + if err != nil { + break + } + for _, b := range mb { + var ctl Control + if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil { + newError("failed to parse proto message").Base(err).WriteToLog() + break + } + if ctl.State != w.state { + w.state = ctl.State + } + } + } + }() +} + +func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) { + if !isInternalDomain(dest) { + ctx = session.ContextWithInbound(ctx, &session.Inbound{ + Tag: w.tag, + }) + return w.dispatcher.Dispatch(ctx, dest) + } + + opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)} + uplinkReader, uplinkWriter := pipe.New(opt...) + downlinkReader, downlinkWriter := pipe.New(opt...) + + w.handleInternalConn(vio.Link{ + Reader: downlinkReader, + Writer: uplinkWriter, + }) + + return &vio.Link{ + Reader: uplinkReader, + Writer: downlinkWriter, + }, nil +} diff --git a/app/reverse/config.go b/app/reverse/config.go new file mode 100644 index 000000000..b80802374 --- /dev/null +++ b/app/reverse/config.go @@ -0,0 +1,14 @@ +package reverse + +import ( + "crypto/rand" + "io" + + "v2ray.com/core/common/dice" +) + +func (c *Control) FillInRandom() { + randomLength := dice.Roll(64) + c.Random = make([]byte, randomLength) + io.ReadFull(rand.Reader, c.Random) +} diff --git a/app/reverse/config.pb.go b/app/reverse/config.pb.go new file mode 100644 index 000000000..91d31e187 --- /dev/null +++ b/app/reverse/config.pb.go @@ -0,0 +1,267 @@ +package reverse + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Control_State int32 + +const ( + Control_ACTIVE Control_State = 0 + Control_DRAIN Control_State = 1 +) + +var Control_State_name = map[int32]string{ + 0: "ACTIVE", + 1: "DRAIN", +} + +var Control_State_value = map[string]int32{ + "ACTIVE": 0, + "DRAIN": 1, +} + +func (x Control_State) String() string { + return proto.EnumName(Control_State_name, int32(x)) +} + +func (Control_State) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_829a0eeb60380cbc, []int{0, 0} +} + +type Control struct { + State Control_State `protobuf:"varint,1,opt,name=state,proto3,enum=v2ray.core.app.reverse.Control_State" json:"state,omitempty"` + Random []byte `protobuf:"bytes,99,opt,name=random,proto3" json:"random,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Control) Reset() { *m = Control{} } +func (m *Control) String() string { return proto.CompactTextString(m) } +func (*Control) ProtoMessage() {} +func (*Control) Descriptor() ([]byte, []int) { + return fileDescriptor_829a0eeb60380cbc, []int{0} +} + +func (m *Control) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Control.Unmarshal(m, b) +} +func (m *Control) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Control.Marshal(b, m, deterministic) +} +func (m *Control) XXX_Merge(src proto.Message) { + xxx_messageInfo_Control.Merge(m, src) +} +func (m *Control) XXX_Size() int { + return xxx_messageInfo_Control.Size(m) +} +func (m *Control) XXX_DiscardUnknown() { + xxx_messageInfo_Control.DiscardUnknown(m) +} + +var xxx_messageInfo_Control proto.InternalMessageInfo + +func (m *Control) GetState() Control_State { + if m != nil { + return m.State + } + return Control_ACTIVE +} + +func (m *Control) GetRandom() []byte { + if m != nil { + return m.Random + } + return nil +} + +type BridgeConfig struct { + Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"` + Domain string `protobuf:"bytes,2,opt,name=domain,proto3" json:"domain,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *BridgeConfig) Reset() { *m = BridgeConfig{} } +func (m *BridgeConfig) String() string { return proto.CompactTextString(m) } +func (*BridgeConfig) ProtoMessage() {} +func (*BridgeConfig) Descriptor() ([]byte, []int) { + return fileDescriptor_829a0eeb60380cbc, []int{1} +} + +func (m *BridgeConfig) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_BridgeConfig.Unmarshal(m, b) +} +func (m *BridgeConfig) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_BridgeConfig.Marshal(b, m, deterministic) +} +func (m *BridgeConfig) XXX_Merge(src proto.Message) { + xxx_messageInfo_BridgeConfig.Merge(m, src) +} +func (m *BridgeConfig) XXX_Size() int { + return xxx_messageInfo_BridgeConfig.Size(m) +} +func (m *BridgeConfig) XXX_DiscardUnknown() { + xxx_messageInfo_BridgeConfig.DiscardUnknown(m) +} + +var xxx_messageInfo_BridgeConfig proto.InternalMessageInfo + +func (m *BridgeConfig) GetTag() string { + if m != nil { + return m.Tag + } + return "" +} + +func (m *BridgeConfig) GetDomain() string { + if m != nil { + return m.Domain + } + return "" +} + +type PortalConfig struct { + Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"` + Domain string `protobuf:"bytes,2,opt,name=domain,proto3" json:"domain,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PortalConfig) Reset() { *m = PortalConfig{} } +func (m *PortalConfig) String() string { return proto.CompactTextString(m) } +func (*PortalConfig) ProtoMessage() {} +func (*PortalConfig) Descriptor() ([]byte, []int) { + return fileDescriptor_829a0eeb60380cbc, []int{2} +} + +func (m *PortalConfig) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PortalConfig.Unmarshal(m, b) +} +func (m *PortalConfig) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PortalConfig.Marshal(b, m, deterministic) +} +func (m *PortalConfig) XXX_Merge(src proto.Message) { + xxx_messageInfo_PortalConfig.Merge(m, src) +} +func (m *PortalConfig) XXX_Size() int { + return xxx_messageInfo_PortalConfig.Size(m) +} +func (m *PortalConfig) XXX_DiscardUnknown() { + xxx_messageInfo_PortalConfig.DiscardUnknown(m) +} + +var xxx_messageInfo_PortalConfig proto.InternalMessageInfo + +func (m *PortalConfig) GetTag() string { + if m != nil { + return m.Tag + } + return "" +} + +func (m *PortalConfig) GetDomain() string { + if m != nil { + return m.Domain + } + return "" +} + +type Config struct { + BridgeConfig []*BridgeConfig `protobuf:"bytes,1,rep,name=bridge_config,json=bridgeConfig,proto3" json:"bridge_config,omitempty"` + PortalConfig []*PortalConfig `protobuf:"bytes,2,rep,name=portal_config,json=portalConfig,proto3" json:"portal_config,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} +func (*Config) Descriptor() ([]byte, []int) { + return fileDescriptor_829a0eeb60380cbc, []int{3} +} + +func (m *Config) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Config.Unmarshal(m, b) +} +func (m *Config) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Config.Marshal(b, m, deterministic) +} +func (m *Config) XXX_Merge(src proto.Message) { + xxx_messageInfo_Config.Merge(m, src) +} +func (m *Config) XXX_Size() int { + return xxx_messageInfo_Config.Size(m) +} +func (m *Config) XXX_DiscardUnknown() { + xxx_messageInfo_Config.DiscardUnknown(m) +} + +var xxx_messageInfo_Config proto.InternalMessageInfo + +func (m *Config) GetBridgeConfig() []*BridgeConfig { + if m != nil { + return m.BridgeConfig + } + return nil +} + +func (m *Config) GetPortalConfig() []*PortalConfig { + if m != nil { + return m.PortalConfig + } + return nil +} + +func init() { + proto.RegisterEnum("v2ray.core.app.reverse.Control_State", Control_State_name, Control_State_value) + proto.RegisterType((*Control)(nil), "v2ray.core.app.reverse.Control") + proto.RegisterType((*BridgeConfig)(nil), "v2ray.core.app.reverse.BridgeConfig") + proto.RegisterType((*PortalConfig)(nil), "v2ray.core.app.reverse.PortalConfig") + proto.RegisterType((*Config)(nil), "v2ray.core.app.reverse.Config") +} + +func init() { + proto.RegisterFile("v2ray.com/core/app/reverse/config.proto", fileDescriptor_829a0eeb60380cbc) +} + +var fileDescriptor_829a0eeb60380cbc = []byte{ + // 310 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0xc1, 0x4b, 0xfb, 0x30, + 0x14, 0xc7, 0x7f, 0xd9, 0x58, 0xc7, 0xde, 0xaf, 0x93, 0x91, 0xc3, 0xe8, 0x41, 0x64, 0x0c, 0xc5, + 0x9d, 0x52, 0xa8, 0x17, 0xc1, 0xd3, 0xd6, 0x79, 0xe8, 0x45, 0x46, 0x94, 0x1d, 0xbc, 0x48, 0xd6, + 0xc5, 0x52, 0x58, 0xfb, 0x42, 0x16, 0x86, 0xbd, 0xf8, 0xa7, 0xf8, 0x07, 0xf8, 0x57, 0x4a, 0xd3, + 0x14, 0x7a, 0x50, 0xc1, 0xdb, 0x7b, 0xc9, 0xe7, 0xf3, 0xf2, 0x4d, 0x02, 0xd7, 0xa7, 0x48, 0x8b, + 0x8a, 0xa5, 0x58, 0x84, 0x29, 0x6a, 0x19, 0x0a, 0xa5, 0x42, 0x2d, 0x4f, 0x52, 0x1f, 0x65, 0x98, + 0x62, 0xf9, 0x9a, 0x67, 0x4c, 0x69, 0x34, 0x48, 0xa7, 0x2d, 0xa8, 0x25, 0x13, 0x4a, 0x31, 0x07, + 0xcd, 0xdf, 0x61, 0x18, 0x63, 0x69, 0x34, 0x1e, 0xe8, 0x1d, 0x0c, 0x8e, 0x46, 0x18, 0x19, 0x90, + 0x19, 0x59, 0x9c, 0x45, 0x57, 0xec, 0x7b, 0x85, 0x39, 0x9e, 0x3d, 0xd6, 0x30, 0x6f, 0x1c, 0x3a, + 0x05, 0x4f, 0x8b, 0x72, 0x8f, 0x45, 0x90, 0xce, 0xc8, 0xc2, 0xe7, 0xae, 0x9b, 0x5f, 0xc0, 0xc0, + 0x72, 0x14, 0xc0, 0x5b, 0xc6, 0x4f, 0xc9, 0xf6, 0x7e, 0xf2, 0x8f, 0x8e, 0x60, 0xb0, 0xe6, 0xcb, + 0xe4, 0x61, 0x42, 0xe6, 0xb7, 0xe0, 0xaf, 0x74, 0xbe, 0xcf, 0x64, 0x6c, 0xd3, 0xd2, 0x09, 0xf4, + 0x8d, 0xc8, 0x6c, 0x84, 0x11, 0xaf, 0xcb, 0x7a, 0xf2, 0x1e, 0x0b, 0x91, 0x97, 0x41, 0xcf, 0x2e, + 0xba, 0xae, 0x36, 0x37, 0xa8, 0x8d, 0x38, 0xfc, 0xd9, 0xfc, 0x20, 0xe0, 0x39, 0x29, 0x81, 0xf1, + 0xce, 0x1e, 0xff, 0xd2, 0xbc, 0x56, 0x40, 0x66, 0xfd, 0xc5, 0xff, 0xe8, 0xf2, 0xa7, 0xbb, 0x77, + 0xb3, 0x72, 0x7f, 0xd7, 0x4d, 0x9e, 0xc0, 0x58, 0xd9, 0x3c, 0xed, 0xa8, 0xde, 0xef, 0xa3, 0xba, + 0xe1, 0xb9, 0xaf, 0x3a, 0xdd, 0x6a, 0x0d, 0xe7, 0x29, 0x16, 0x5d, 0x51, 0x69, 0x7c, 0xab, 0x5a, + 0x75, 0x43, 0x9e, 0x87, 0xae, 0xfc, 0xec, 0x05, 0xdb, 0x88, 0x8b, 0x8a, 0xc5, 0x35, 0xb5, 0xb1, + 0x14, 0x6f, 0xb6, 0x76, 0x9e, 0xfd, 0xf9, 0x9b, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb7, 0x83, + 0x30, 0xdb, 0x24, 0x02, 0x00, 0x00, +} diff --git a/app/reverse/config.proto b/app/reverse/config.proto new file mode 100644 index 000000000..9cf649d89 --- /dev/null +++ b/app/reverse/config.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package v2ray.core.app.reverse; +option csharp_namespace = "V2Ray.Core.Proxy.Reverse"; +option go_package = "reverse"; +option java_package = "com.v2ray.core.proxy.reverse"; +option java_multiple_files = true; + +message Control { + enum State { + ACTIVE = 0; + DRAIN = 1; + } + + State state = 1; + bytes random = 99; +} + +message BridgeConfig { + string tag = 1; + string domain = 2; + +} + +message PortalConfig { + string tag = 1; + string domain = 2; +} + +message Config { + repeated BridgeConfig bridge_config = 1; + repeated PortalConfig portal_config = 2; +} diff --git a/app/reverse/errors.generated.go b/app/reverse/errors.generated.go new file mode 100644 index 000000000..e72207995 --- /dev/null +++ b/app/reverse/errors.generated.go @@ -0,0 +1,9 @@ +package reverse + +import "v2ray.com/core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/app/reverse/portal.go b/app/reverse/portal.go new file mode 100644 index 000000000..12baece8a --- /dev/null +++ b/app/reverse/portal.go @@ -0,0 +1,242 @@ +package reverse + +import ( + "context" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "v2ray.com/core/common" + "v2ray.com/core/common/buf" + "v2ray.com/core/common/dice" + "v2ray.com/core/common/mux" + "v2ray.com/core/common/session" + "v2ray.com/core/common/task" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/outbound" + "v2ray.com/core/transport/pipe" +) + +type Portal struct { + ohm outbound.Manager + tag string + domain string + picker *StaticMuxPicker + client *mux.ClientManager +} + +func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) { + if len(config.Tag) == 0 { + return nil, newError("portal tag is empty") + } + + if len(config.Domain) == 0 { + return nil, newError("portal domain is empty") + } + + picker, err := NewStaticMuxPicker() + if err != nil { + return nil, err + } + + return &Portal{ + ohm: ohm, + tag: config.Tag, + domain: config.Domain, + picker: picker, + client: &mux.ClientManager{ + Picker: picker, + }, + }, nil +} + +func (p *Portal) Start() error { + return p.ohm.AddHandler(context.Background(), &Outbound{ + portal: p, + }) +} + +func (p *Portal) Close() error { + return p.ohm.RemoveHandler(context.Background(), p.tag) +} + +func (s *Portal) HandleConnection(ctx context.Context, link *vio.Link) error { + outboundMeta := session.OutboundFromContext(ctx) + if outboundMeta == nil { + return newError("outbound metadata not found").AtError() + } + + if isInternalDomain(outboundMeta.Target) { + muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{ + MaxConcurrency: 0, + MaxConnection: 256, + }) + if err != nil { + return newError("failed to create mux client worker").Base(err).AtWarning() + } + + worker, err := NewPortalWorker(muxClient) + if err != nil { + return newError("failed to create portal worker").Base(err) + } + + s.picker.AddWorker(worker) + return nil + } + + return s.client.Dispatch(ctx, link) +} + +type Outbound struct { + portal *Portal + tag string +} + +func (o *Outbound) Tag() string { + return o.tag +} + +func (o *Outbound) Dispatch(ctx context.Context, link *vio.Link) { + if err := o.portal.HandleConnection(ctx, link); err != nil { + newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx)) + pipe.CloseError(link.Writer) + } +} + +func (o *Outbound) Start() error { + return nil +} + +func (o *Outbound) Close() error { + return nil +} + +type StaticMuxPicker struct { + access sync.Mutex + workers []*PortalWorker + cTask *task.Periodic +} + +func NewStaticMuxPicker() (*StaticMuxPicker, error) { + p := &StaticMuxPicker{} + p.cTask = &task.Periodic{ + Execute: p.cleanup, + Interval: time.Second * 30, + } + p.cTask.Start() + return p, nil +} + +func (p *StaticMuxPicker) cleanup() error { + p.access.Lock() + defer p.access.Unlock() + + var activeWorkers []*PortalWorker + for _, w := range p.workers { + if !w.Closed() { + activeWorkers = append(activeWorkers, w) + } + } + + if len(activeWorkers) != len(p.workers) { + p.workers = activeWorkers + } + + return nil +} + +func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) { + p.access.Lock() + defer p.access.Unlock() + + n := len(p.workers) + if n == 0 { + return nil, newError("empty worker list") + } + + idx := dice.Roll(n) + for i := 0; i < n; i++ { + w := p.workers[(i+idx)%n] + if !w.IsFull() { + return w.client, nil + } + } + + return nil, newError("no mux client worker available") +} + +func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) { + p.access.Lock() + defer p.access.Unlock() + + p.workers = append(p.workers, worker) +} + +type PortalWorker struct { + client *mux.ClientWorker + control *task.Periodic + writer buf.Writer + reader buf.Reader +} + +func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) { + opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)} + uplinkReader, uplinkWriter := pipe.New(opt...) + downlinkReader, downlinkWriter := pipe.New(opt...) + + f := client.Dispatch(context.Background(), &vio.Link{ + Reader: uplinkReader, + Writer: downlinkWriter, + }) + if !f { + return nil, newError("unable to dispatch control connection") + } + w := &PortalWorker{ + client: client, + reader: downlinkReader, + writer: uplinkWriter, + } + w.control = &task.Periodic{ + Execute: w.heartbeat, + Interval: time.Second * 2, + } + w.control.Start() + return w, nil +} + +func (w *PortalWorker) heartbeat() error { + if w.client.Closed() { + return newError("client worker stopped") + } + + if w.writer == nil { + return newError("already disposed") + } + + msg := &Control{} + msg.FillInRandom() + + if w.client.IsClosing() { + msg.State = Control_DRAIN + + defer func() { + common.Close(w.writer) + pipe.CloseError(w.reader) + w.writer = nil + }() + } + + b, err := proto.Marshal(msg) + common.Must(err) + var mb buf.MultiBuffer + common.Must2(mb.Write(b)) + return w.writer.WriteMultiBuffer(mb) +} + +func (w *PortalWorker) IsFull() bool { + return w.client.IsFull() +} + +func (w *PortalWorker) Closed() bool { + return w.client.Closed() +} diff --git a/app/reverse/portal_test.go b/app/reverse/portal_test.go new file mode 100644 index 000000000..aa4660a5b --- /dev/null +++ b/app/reverse/portal_test.go @@ -0,0 +1,20 @@ +package reverse_test + +import ( + "testing" + + "v2ray.com/core/app/reverse" + "v2ray.com/core/common" +) + +func TestStaticPickerEmpty(t *testing.T) { + picker, err := reverse.NewStaticMuxPicker() + common.Must(err) + worker, err := picker.PickAvailable() + if err == nil { + t.Error("expected error, but nil") + } + if worker != nil { + t.Error("expected nil worker, but not nil") + } +} diff --git a/app/reverse/reverse.go b/app/reverse/reverse.go new file mode 100644 index 000000000..194ccae14 --- /dev/null +++ b/app/reverse/reverse.go @@ -0,0 +1,58 @@ +package reverse + +//go:generate errorgen + +import ( + "context" + + "v2ray.com/core" + "v2ray.com/core/common" + "v2ray.com/core/common/net" + "v2ray.com/core/features/outbound" + "v2ray.com/core/features/routing" +) + +const ( + internalDomain = "reverse.internal.v2ray.com" +) + +func isInternalDomain(dest net.Destination) bool { + return dest.Address.Family().IsDomain() && dest.Address.Domain() == internalDomain +} + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + r := new(Reverse) + if err := core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error { + return r.Init(config.(*Config), d, om) + }); err != nil { + return nil, err + } + return r, nil + })) +} + +type Reverse struct { + bridges []*Bridge + portals []*Portal +} + +func (r *Reverse) Init(config *Config, d routing.Dispatcher, ohm outbound.Manager) error { + for _, bConfig := range config.BridgeConfig { + b, err := NewBridge(bConfig, d) + if err != nil { + return err + } + r.bridges = append(r.bridges, b) + } + + for _, pConfig := range config.PortalConfig { + p, err := NewPortal(pConfig, ohm) + if err != nil { + return err + } + r.portals = append(r.portals, p) + } + + return nil +} diff --git a/common/mux/server.go b/common/mux/server.go index 921b19d69..68eac8f1c 100644 --- a/common/mux/server.go +++ b/common/mux/server.go @@ -92,6 +92,10 @@ func handle(ctx context.Context, s *Session, output buf.Writer) { s.Close() } +func (w *ServerWorker) ActiveConnections() uint32 { + return uint32(w.sessionManager.Size()) +} + func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error { if meta.Option.Has(OptionData) { return buf.Copy(NewStreamReader(reader), buf.Discard) diff --git a/main/distro/all/all.go b/main/distro/all/all.go index 84602f1e2..a8a699084 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -18,6 +18,7 @@ import ( _ "v2ray.com/core/app/dns" _ "v2ray.com/core/app/log" _ "v2ray.com/core/app/policy" + _ "v2ray.com/core/app/reverse" _ "v2ray.com/core/app/router" _ "v2ray.com/core/app/stats"