From 1e6d5561cc9ef279ffe9a061408f712be012a724 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Mon, 5 Feb 2018 23:38:24 +0100 Subject: [PATCH] prototype of commander --- app/commander/commander.go | 63 +++ app/commander/config.pb.go | 51 +++ app/commander/config.proto | 11 + app/commander/errors.generated.go | 5 + app/commander/outbound.go | 65 +++ app/proxyman/command/command.go | 139 ++++++ app/proxyman/command/command.pb.go | 520 +++++++++++++++++++++++ app/proxyman/command/command.proto | 80 ++++ app/proxyman/command/doc.go | 3 + app/proxyman/command/errors.generated.go | 5 + app/proxyman/inbound/always.go | 4 + app/proxyman/inbound/inbound.go | 45 +- app/proxyman/outbound/handler.go | 6 +- app/proxyman/outbound/outbound.go | 15 + dial.go | 2 +- network.go | 6 + proxy/proxy.go | 18 + proxy/vmess/inbound/inbound.go | 4 + transport/ray/connection.go | 71 +++- v2ray.go | 17 +- 20 files changed, 1097 insertions(+), 33 deletions(-) create mode 100644 app/commander/commander.go create mode 100644 app/commander/config.pb.go create mode 100644 app/commander/config.proto create mode 100644 app/commander/errors.generated.go create mode 100644 app/commander/outbound.go create mode 100644 app/proxyman/command/command.go create mode 100644 app/proxyman/command/command.pb.go create mode 100644 app/proxyman/command/command.proto create mode 100644 app/proxyman/command/doc.go create mode 100644 app/proxyman/command/errors.generated.go diff --git a/app/commander/commander.go b/app/commander/commander.go new file mode 100644 index 000000000..43a4aa929 --- /dev/null +++ b/app/commander/commander.go @@ -0,0 +1,63 @@ +package commander + +//go:generate go run $GOPATH/src/v2ray.com/core/common/errors/errorgen/main.go -pkg commander -path App,Commander + +import ( + "context" + "net" + "sync" + + "google.golang.org/grpc" + "v2ray.com/core" +) + +type Commander struct { + sync.Mutex + server *grpc.Server + config Config + ohm core.OutboundHandlerManager + callbacks []core.ServiceRegistryCallback +} + +func (c *Commander) RegisterService(callback core.ServiceRegistryCallback) { + c.Lock() + defer c.Unlock() + + if callback == nil { + return + } + + c.callbacks = append(c.callbacks, callback) +} + +func (c *Commander) Start() error { + c.Lock() + c.server = grpc.NewServer() + for _, callback := range c.callbacks { + callback(c.server) + } + c.Unlock() + + listener := &OutboundListener{ + buffer: make(chan net.Conn, 4), + } + + c.server.Serve(listener) + + c.ohm.RemoveHandler(context.Background(), c.config.Tag) + c.ohm.AddHandler(context.Background(), &CommanderOutbound{ + tag: c.config.Tag, + listener: listener, + }) + return nil +} + +func (c *Commander) Close() { + c.Lock() + defer c.Unlock() + + if c.server != nil { + c.server.Stop() + c.server = nil + } +} diff --git a/app/commander/config.pb.go b/app/commander/config.pb.go new file mode 100644 index 000000000..dbb64bdf8 --- /dev/null +++ b/app/commander/config.pb.go @@ -0,0 +1,51 @@ +package commander + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Config struct { + Tag string `protobuf:"bytes,1,opt,name=tag" json:"tag,omitempty"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} +func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Config) GetTag() string { + if m != nil { + return m.Tag + } + return "" +} + +func init() { + proto.RegisterType((*Config)(nil), "v2ray.core.app.commander.Config") +} + +func init() { proto.RegisterFile("v2ray.com/core/app/commander/config.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 143 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x2c, 0x33, 0x2a, 0x4a, + 0xac, 0xd4, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0x2f, 0x4a, 0xd5, 0x4f, 0x2c, 0x28, 0xd0, 0x4f, + 0xce, 0xcf, 0xcd, 0x4d, 0xcc, 0x4b, 0x49, 0x2d, 0xd2, 0x4f, 0xce, 0xcf, 0x4b, 0xcb, 0x4c, 0xd7, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x92, 0x80, 0x29, 0x2d, 0x4a, 0xd5, 0x4b, 0x2c, 0x28, 0xd0, + 0x83, 0x2b, 0x53, 0x92, 0xe2, 0x62, 0x73, 0x06, 0xab, 0x14, 0x12, 0xe0, 0x62, 0x2e, 0x49, 0x4c, + 0x97, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x02, 0x31, 0x9d, 0xdc, 0xb8, 0x64, 0x92, 0xf3, 0x73, + 0xf5, 0x70, 0xe9, 0x0d, 0x60, 0x8c, 0xe2, 0x84, 0x73, 0x56, 0x31, 0x49, 0x84, 0x19, 0x05, 0x25, + 0x56, 0xea, 0x39, 0x83, 0xd4, 0x39, 0x16, 0x14, 0xe8, 0x39, 0xc3, 0xa4, 0x92, 0xd8, 0xc0, 0x8e, + 0x30, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x36, 0x74, 0x98, 0x19, 0xb1, 0x00, 0x00, 0x00, +} diff --git a/app/commander/config.proto b/app/commander/config.proto new file mode 100644 index 000000000..56a7b7745 --- /dev/null +++ b/app/commander/config.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package v2ray.core.app.commander; +option csharp_namespace = "V2Ray.Core.App.Commander"; +option go_package = "commander"; +option java_package = "com.v2ray.core.app.commander"; +option java_multiple_files = true; + +message Config { + string tag = 1; +} diff --git a/app/commander/errors.generated.go b/app/commander/errors.generated.go new file mode 100644 index 000000000..fd7b91797 --- /dev/null +++ b/app/commander/errors.generated.go @@ -0,0 +1,5 @@ +package commander + +import "v2ray.com/core/common/errors" + +func newError(values ...interface{}) *errors.Error { return errors.New(values...).Path("App", "Commander") } diff --git a/app/commander/outbound.go b/app/commander/outbound.go new file mode 100644 index 000000000..1338c7451 --- /dev/null +++ b/app/commander/outbound.go @@ -0,0 +1,65 @@ +package commander + +import ( + "context" + "net" + + "v2ray.com/core/common/signal" + "v2ray.com/core/transport/ray" +) + +type OutboundListener struct { + buffer chan net.Conn +} + +func (l *OutboundListener) add(conn net.Conn) { + select { + case l.buffer <- conn: + default: + conn.Close() + } +} + +func (l *OutboundListener) Accept() (net.Conn, error) { + c, open := <-l.buffer + if !open { + return nil, newError("listener closed") + } + return c, nil +} + +func (l *OutboundListener) Close() error { + close(l.buffer) + return nil +} + +func (l *OutboundListener) Addr() net.Addr { + return &net.TCPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: 0, + } +} + +type CommanderOutbound struct { + tag string + listener *OutboundListener +} + +func (co *CommanderOutbound) Dispatch(ctx context.Context, r ray.OutboundRay) { + closeSignal := signal.NewNotifier() + c := ray.NewConnection(r.OutboundInput(), r.OutboundOutput(), ray.ConnCloseSignal(closeSignal)) + co.listener.add(c) + <-closeSignal.Wait() + + return +} + +func (co *CommanderOutbound) Tag() string { + return co.tag +} + +func (co *CommanderOutbound) Start() error { + return nil +} + +func (co *CommanderOutbound) Close() {} \ No newline at end of file diff --git a/app/proxyman/command/command.go b/app/proxyman/command/command.go new file mode 100644 index 000000000..7ee973c4f --- /dev/null +++ b/app/proxyman/command/command.go @@ -0,0 +1,139 @@ +package command + +import ( + "context" + + grpc "google.golang.org/grpc" + "v2ray.com/core" + "v2ray.com/core/common" + "v2ray.com/core/proxy" +) + +type InboundOperation interface { + ApplyInbound(context.Context, core.InboundHandler) error +} + +type OutboundOperation interface { + ApplyOutbound(context.Context, core.OutboundHandler) error +} + +func (op *AddUserOperation) ApplyInbound(ctx context.Context, handler core.InboundHandler) error { + getInbound, ok := handler.(proxy.GetInbound) + if !ok { + return newError("can't get inbound proxy from handler") + } + p := getInbound.GetInbound() + um, ok := p.(proxy.UserManager) + if !ok { + return newError("proxy is not an UserManager") + } + return um.AddUser(ctx, op.User) +} + +func (op *AddUserOperation) ApplyOutbound(ctx context.Context, handler core.OutboundHandler) error { + getOutbound, ok := handler.(proxy.GetOutbound) + if !ok { + return newError("can't get outbound proxy from handler") + } + p := getOutbound.GetOutbound() + um, ok := p.(proxy.UserManager) + if !ok { + return newError("proxy in not an UserManager") + } + return um.AddUser(ctx, op.User) +} + +type handlerServer struct { + s *core.Instance + ihm core.InboundHandlerManager + ohm core.OutboundHandlerManager +} + +func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) { + rawHandler, err := s.s.CreateObject(request.Inbound) + if err != nil { + return nil, err + } + handler, ok := rawHandler.(core.InboundHandler) + if !ok { + return nil, newError("not an InboundHandler.") + } + return &AddInboundResponse{}, s.ihm.AddHandler(ctx, handler) +} + +func (s *handlerServer) RemoveInbound(ctx context.Context, request *RemoveInboundRequest) (*RemoveInboundResponse, error) { + return &RemoveInboundResponse{}, s.ihm.RemoveHandler(ctx, request.Tag) +} + +func (s *handlerServer) AlterInbound(ctx context.Context, request *AlterInboundRequest) (*AlterInboundResponse, error) { + rawOperation, err := request.Operation.GetInstance() + if err != nil { + return nil, newError("unknown operation").Base(err) + } + operation, ok := rawOperation.(InboundOperation) + if !ok { + return nil, newError("not an inbound operation") + } + + handler, err := s.ihm.GetHandler(ctx, request.Tag) + if err != nil { + return nil, newError("failed to get handler: ", request.Tag).Base(err) + } + + return &AlterInboundResponse{}, operation.ApplyInbound(ctx, handler) +} + +func (s *handlerServer) AddOutbound(ctx context.Context, request *AddOutboundRequest) (*AddOutboundResponse, error) { + rawHandler, err := s.s.CreateObject(request.Outbound) + if err != nil { + return nil, err + } + handler, ok := rawHandler.(core.OutboundHandler) + if !ok { + return nil, newError("not an OutboundHandler.") + } + return &AddOutboundResponse{}, s.ohm.AddHandler(ctx, handler) +} + +func (s *handlerServer) RemoveOutbound(ctx context.Context, request *RemoveOutboundRequest) (*RemoveOutboundResponse, error) { + return &RemoveOutboundResponse{}, s.ohm.RemoveHandler(ctx, request.Tag) +} + +func (s *handlerServer) AlterOutbound(ctx context.Context, request *AlterOutboundRequest) (*AlterOutboundResponse, error) { + rawOperation, err := request.Operation.GetInstance() + if err != nil { + return nil, newError("unknown operation").Base(err) + } + operation, ok := rawOperation.(OutboundOperation) + if !ok { + return nil, newError("not an outbound operation") + } + + handler := s.ohm.GetHandler(request.Tag) + return &AlterOutboundResponse{}, operation.ApplyOutbound(ctx, handler) +} + +type feature struct{} + +func (*feature) Start() error { + return nil +} + +func (*feature) Close() {} + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) { + s := core.FromContext(ctx) + if s == nil { + return nil, newError("V is not in context.") + } + s.Commander().RegisterService(func(server *grpc.Server) { + RegisterHandlerServiceServer(server, &handlerServer{ + s: s, + ihm: s.InboundHandlerManager(), + ohm: s.OutboundHandlerManager(), + }) + }) + return &feature{}, nil + })) +} diff --git a/app/proxyman/command/command.pb.go b/app/proxyman/command/command.pb.go new file mode 100644 index 000000000..fe0c1071d --- /dev/null +++ b/app/proxyman/command/command.pb.go @@ -0,0 +1,520 @@ +package command + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import v2ray_core_common_protocol "v2ray.com/core/common/protocol" +import v2ray_core_common_serial "v2ray.com/core/common/serial" +import v2ray_core "v2ray.com/core" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type AddUserOperation struct { + User *v2ray_core_common_protocol.User `protobuf:"bytes,1,opt,name=user" json:"user,omitempty"` +} + +func (m *AddUserOperation) Reset() { *m = AddUserOperation{} } +func (m *AddUserOperation) String() string { return proto.CompactTextString(m) } +func (*AddUserOperation) ProtoMessage() {} +func (*AddUserOperation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *AddUserOperation) GetUser() *v2ray_core_common_protocol.User { + if m != nil { + return m.User + } + return nil +} + +type RemoveUserOperation struct { + Email string `protobuf:"bytes,1,opt,name=email" json:"email,omitempty"` +} + +func (m *RemoveUserOperation) Reset() { *m = RemoveUserOperation{} } +func (m *RemoveUserOperation) String() string { return proto.CompactTextString(m) } +func (*RemoveUserOperation) ProtoMessage() {} +func (*RemoveUserOperation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *RemoveUserOperation) GetEmail() string { + if m != nil { + return m.Email + } + return "" +} + +type AddInboundRequest struct { + Inbound *v2ray_core.InboundHandlerConfig `protobuf:"bytes,1,opt,name=inbound" json:"inbound,omitempty"` +} + +func (m *AddInboundRequest) Reset() { *m = AddInboundRequest{} } +func (m *AddInboundRequest) String() string { return proto.CompactTextString(m) } +func (*AddInboundRequest) ProtoMessage() {} +func (*AddInboundRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *AddInboundRequest) GetInbound() *v2ray_core.InboundHandlerConfig { + if m != nil { + return m.Inbound + } + return nil +} + +type AddInboundResponse struct { +} + +func (m *AddInboundResponse) Reset() { *m = AddInboundResponse{} } +func (m *AddInboundResponse) String() string { return proto.CompactTextString(m) } +func (*AddInboundResponse) ProtoMessage() {} +func (*AddInboundResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +type RemoveInboundRequest struct { + Tag string `protobuf:"bytes,1,opt,name=tag" json:"tag,omitempty"` +} + +func (m *RemoveInboundRequest) Reset() { *m = RemoveInboundRequest{} } +func (m *RemoveInboundRequest) String() string { return proto.CompactTextString(m) } +func (*RemoveInboundRequest) ProtoMessage() {} +func (*RemoveInboundRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *RemoveInboundRequest) GetTag() string { + if m != nil { + return m.Tag + } + return "" +} + +type RemoveInboundResponse struct { +} + +func (m *RemoveInboundResponse) Reset() { *m = RemoveInboundResponse{} } +func (m *RemoveInboundResponse) String() string { return proto.CompactTextString(m) } +func (*RemoveInboundResponse) ProtoMessage() {} +func (*RemoveInboundResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +type AlterInboundRequest struct { + Tag string `protobuf:"bytes,1,opt,name=tag" json:"tag,omitempty"` + Operation *v2ray_core_common_serial.TypedMessage `protobuf:"bytes,2,opt,name=operation" json:"operation,omitempty"` +} + +func (m *AlterInboundRequest) Reset() { *m = AlterInboundRequest{} } +func (m *AlterInboundRequest) String() string { return proto.CompactTextString(m) } +func (*AlterInboundRequest) ProtoMessage() {} +func (*AlterInboundRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func (m *AlterInboundRequest) GetTag() string { + if m != nil { + return m.Tag + } + return "" +} + +func (m *AlterInboundRequest) GetOperation() *v2ray_core_common_serial.TypedMessage { + if m != nil { + return m.Operation + } + return nil +} + +type AlterInboundResponse struct { +} + +func (m *AlterInboundResponse) Reset() { *m = AlterInboundResponse{} } +func (m *AlterInboundResponse) String() string { return proto.CompactTextString(m) } +func (*AlterInboundResponse) ProtoMessage() {} +func (*AlterInboundResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } + +type AddOutboundRequest struct { + Outbound *v2ray_core.OutboundHandlerConfig `protobuf:"bytes,1,opt,name=outbound" json:"outbound,omitempty"` +} + +func (m *AddOutboundRequest) Reset() { *m = AddOutboundRequest{} } +func (m *AddOutboundRequest) String() string { return proto.CompactTextString(m) } +func (*AddOutboundRequest) ProtoMessage() {} +func (*AddOutboundRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +func (m *AddOutboundRequest) GetOutbound() *v2ray_core.OutboundHandlerConfig { + if m != nil { + return m.Outbound + } + return nil +} + +type AddOutboundResponse struct { +} + +func (m *AddOutboundResponse) Reset() { *m = AddOutboundResponse{} } +func (m *AddOutboundResponse) String() string { return proto.CompactTextString(m) } +func (*AddOutboundResponse) ProtoMessage() {} +func (*AddOutboundResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } + +type RemoveOutboundRequest struct { + Tag string `protobuf:"bytes,1,opt,name=tag" json:"tag,omitempty"` +} + +func (m *RemoveOutboundRequest) Reset() { *m = RemoveOutboundRequest{} } +func (m *RemoveOutboundRequest) String() string { return proto.CompactTextString(m) } +func (*RemoveOutboundRequest) ProtoMessage() {} +func (*RemoveOutboundRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } + +func (m *RemoveOutboundRequest) GetTag() string { + if m != nil { + return m.Tag + } + return "" +} + +type RemoveOutboundResponse struct { +} + +func (m *RemoveOutboundResponse) Reset() { *m = RemoveOutboundResponse{} } +func (m *RemoveOutboundResponse) String() string { return proto.CompactTextString(m) } +func (*RemoveOutboundResponse) ProtoMessage() {} +func (*RemoveOutboundResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } + +type AlterOutboundRequest struct { + Tag string `protobuf:"bytes,1,opt,name=tag" json:"tag,omitempty"` + Operation *v2ray_core_common_serial.TypedMessage `protobuf:"bytes,2,opt,name=operation" json:"operation,omitempty"` +} + +func (m *AlterOutboundRequest) Reset() { *m = AlterOutboundRequest{} } +func (m *AlterOutboundRequest) String() string { return proto.CompactTextString(m) } +func (*AlterOutboundRequest) ProtoMessage() {} +func (*AlterOutboundRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } + +func (m *AlterOutboundRequest) GetTag() string { + if m != nil { + return m.Tag + } + return "" +} + +func (m *AlterOutboundRequest) GetOperation() *v2ray_core_common_serial.TypedMessage { + if m != nil { + return m.Operation + } + return nil +} + +type AlterOutboundResponse struct { +} + +func (m *AlterOutboundResponse) Reset() { *m = AlterOutboundResponse{} } +func (m *AlterOutboundResponse) String() string { return proto.CompactTextString(m) } +func (*AlterOutboundResponse) ProtoMessage() {} +func (*AlterOutboundResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } + +type Config struct { +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} +func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } + +func init() { + proto.RegisterType((*AddUserOperation)(nil), "v2ray.core.app.proxyman.command.AddUserOperation") + proto.RegisterType((*RemoveUserOperation)(nil), "v2ray.core.app.proxyman.command.RemoveUserOperation") + proto.RegisterType((*AddInboundRequest)(nil), "v2ray.core.app.proxyman.command.AddInboundRequest") + proto.RegisterType((*AddInboundResponse)(nil), "v2ray.core.app.proxyman.command.AddInboundResponse") + proto.RegisterType((*RemoveInboundRequest)(nil), "v2ray.core.app.proxyman.command.RemoveInboundRequest") + proto.RegisterType((*RemoveInboundResponse)(nil), "v2ray.core.app.proxyman.command.RemoveInboundResponse") + proto.RegisterType((*AlterInboundRequest)(nil), "v2ray.core.app.proxyman.command.AlterInboundRequest") + proto.RegisterType((*AlterInboundResponse)(nil), "v2ray.core.app.proxyman.command.AlterInboundResponse") + proto.RegisterType((*AddOutboundRequest)(nil), "v2ray.core.app.proxyman.command.AddOutboundRequest") + proto.RegisterType((*AddOutboundResponse)(nil), "v2ray.core.app.proxyman.command.AddOutboundResponse") + proto.RegisterType((*RemoveOutboundRequest)(nil), "v2ray.core.app.proxyman.command.RemoveOutboundRequest") + proto.RegisterType((*RemoveOutboundResponse)(nil), "v2ray.core.app.proxyman.command.RemoveOutboundResponse") + proto.RegisterType((*AlterOutboundRequest)(nil), "v2ray.core.app.proxyman.command.AlterOutboundRequest") + proto.RegisterType((*AlterOutboundResponse)(nil), "v2ray.core.app.proxyman.command.AlterOutboundResponse") + proto.RegisterType((*Config)(nil), "v2ray.core.app.proxyman.command.Config") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for HandlerService service + +type HandlerServiceClient interface { + AddInbound(ctx context.Context, in *AddInboundRequest, opts ...grpc.CallOption) (*AddInboundResponse, error) + RemoveInbound(ctx context.Context, in *RemoveInboundRequest, opts ...grpc.CallOption) (*RemoveInboundResponse, error) + AlterInbound(ctx context.Context, in *AlterInboundRequest, opts ...grpc.CallOption) (*AlterInboundResponse, error) + AddOutbound(ctx context.Context, in *AddOutboundRequest, opts ...grpc.CallOption) (*AddOutboundResponse, error) + RemoveOutbound(ctx context.Context, in *RemoveOutboundRequest, opts ...grpc.CallOption) (*RemoveOutboundResponse, error) + AlterOutbound(ctx context.Context, in *AlterOutboundRequest, opts ...grpc.CallOption) (*AlterOutboundResponse, error) +} + +type handlerServiceClient struct { + cc *grpc.ClientConn +} + +func NewHandlerServiceClient(cc *grpc.ClientConn) HandlerServiceClient { + return &handlerServiceClient{cc} +} + +func (c *handlerServiceClient) AddInbound(ctx context.Context, in *AddInboundRequest, opts ...grpc.CallOption) (*AddInboundResponse, error) { + out := new(AddInboundResponse) + err := grpc.Invoke(ctx, "/v2ray.core.app.proxyman.command.HandlerService/AddInbound", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *handlerServiceClient) RemoveInbound(ctx context.Context, in *RemoveInboundRequest, opts ...grpc.CallOption) (*RemoveInboundResponse, error) { + out := new(RemoveInboundResponse) + err := grpc.Invoke(ctx, "/v2ray.core.app.proxyman.command.HandlerService/RemoveInbound", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *handlerServiceClient) AlterInbound(ctx context.Context, in *AlterInboundRequest, opts ...grpc.CallOption) (*AlterInboundResponse, error) { + out := new(AlterInboundResponse) + err := grpc.Invoke(ctx, "/v2ray.core.app.proxyman.command.HandlerService/AlterInbound", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *handlerServiceClient) AddOutbound(ctx context.Context, in *AddOutboundRequest, opts ...grpc.CallOption) (*AddOutboundResponse, error) { + out := new(AddOutboundResponse) + err := grpc.Invoke(ctx, "/v2ray.core.app.proxyman.command.HandlerService/AddOutbound", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *handlerServiceClient) RemoveOutbound(ctx context.Context, in *RemoveOutboundRequest, opts ...grpc.CallOption) (*RemoveOutboundResponse, error) { + out := new(RemoveOutboundResponse) + err := grpc.Invoke(ctx, "/v2ray.core.app.proxyman.command.HandlerService/RemoveOutbound", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *handlerServiceClient) AlterOutbound(ctx context.Context, in *AlterOutboundRequest, opts ...grpc.CallOption) (*AlterOutboundResponse, error) { + out := new(AlterOutboundResponse) + err := grpc.Invoke(ctx, "/v2ray.core.app.proxyman.command.HandlerService/AlterOutbound", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for HandlerService service + +type HandlerServiceServer interface { + AddInbound(context.Context, *AddInboundRequest) (*AddInboundResponse, error) + RemoveInbound(context.Context, *RemoveInboundRequest) (*RemoveInboundResponse, error) + AlterInbound(context.Context, *AlterInboundRequest) (*AlterInboundResponse, error) + AddOutbound(context.Context, *AddOutboundRequest) (*AddOutboundResponse, error) + RemoveOutbound(context.Context, *RemoveOutboundRequest) (*RemoveOutboundResponse, error) + AlterOutbound(context.Context, *AlterOutboundRequest) (*AlterOutboundResponse, error) +} + +func RegisterHandlerServiceServer(s *grpc.Server, srv HandlerServiceServer) { + s.RegisterService(&_HandlerService_serviceDesc, srv) +} + +func _HandlerService_AddInbound_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AddInboundRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandlerServiceServer).AddInbound(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v2ray.core.app.proxyman.command.HandlerService/AddInbound", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandlerServiceServer).AddInbound(ctx, req.(*AddInboundRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HandlerService_RemoveInbound_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RemoveInboundRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandlerServiceServer).RemoveInbound(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v2ray.core.app.proxyman.command.HandlerService/RemoveInbound", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandlerServiceServer).RemoveInbound(ctx, req.(*RemoveInboundRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HandlerService_AlterInbound_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AlterInboundRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandlerServiceServer).AlterInbound(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v2ray.core.app.proxyman.command.HandlerService/AlterInbound", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandlerServiceServer).AlterInbound(ctx, req.(*AlterInboundRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HandlerService_AddOutbound_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AddOutboundRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandlerServiceServer).AddOutbound(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v2ray.core.app.proxyman.command.HandlerService/AddOutbound", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandlerServiceServer).AddOutbound(ctx, req.(*AddOutboundRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HandlerService_RemoveOutbound_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RemoveOutboundRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandlerServiceServer).RemoveOutbound(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v2ray.core.app.proxyman.command.HandlerService/RemoveOutbound", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandlerServiceServer).RemoveOutbound(ctx, req.(*RemoveOutboundRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HandlerService_AlterOutbound_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AlterOutboundRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandlerServiceServer).AlterOutbound(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v2ray.core.app.proxyman.command.HandlerService/AlterOutbound", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandlerServiceServer).AlterOutbound(ctx, req.(*AlterOutboundRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _HandlerService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "v2ray.core.app.proxyman.command.HandlerService", + HandlerType: (*HandlerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "AddInbound", + Handler: _HandlerService_AddInbound_Handler, + }, + { + MethodName: "RemoveInbound", + Handler: _HandlerService_RemoveInbound_Handler, + }, + { + MethodName: "AlterInbound", + Handler: _HandlerService_AlterInbound_Handler, + }, + { + MethodName: "AddOutbound", + Handler: _HandlerService_AddOutbound_Handler, + }, + { + MethodName: "RemoveOutbound", + Handler: _HandlerService_RemoveOutbound_Handler, + }, + { + MethodName: "AlterOutbound", + Handler: _HandlerService_AlterOutbound_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "v2ray.com/core/app/proxyman/command/command.proto", +} + +func init() { proto.RegisterFile("v2ray.com/core/app/proxyman/command/command.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 557 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xdf, 0x6b, 0xd3, 0x40, + 0x1c, 0xb7, 0x53, 0xbb, 0xed, 0x3b, 0x1d, 0xf3, 0xda, 0x6e, 0x25, 0x3e, 0x6c, 0x46, 0x90, 0x0d, + 0xe1, 0xa2, 0x59, 0x37, 0x41, 0xf0, 0xa1, 0xd6, 0x87, 0xf9, 0x20, 0x1d, 0x99, 0xfa, 0xe0, 0x8b, + 0xdc, 0x92, 0xb3, 0x04, 0x92, 0xbb, 0xf3, 0x92, 0x56, 0x2b, 0x08, 0x82, 0xff, 0x80, 0x7f, 0x87, + 0x7f, 0xa5, 0x24, 0x77, 0xd7, 0x25, 0x69, 0x21, 0x0d, 0xf8, 0xd4, 0xf4, 0xfa, 0xf9, 0xf5, 0xfd, + 0xde, 0x27, 0x14, 0x9e, 0xcf, 0x5c, 0x49, 0xe6, 0xd8, 0xe7, 0xb1, 0xe3, 0x73, 0x49, 0x1d, 0x22, + 0x84, 0x23, 0x24, 0xff, 0x3e, 0x8f, 0x09, 0x73, 0x7c, 0x1e, 0xc7, 0x84, 0x05, 0xe6, 0x13, 0x0b, + 0xc9, 0x53, 0x8e, 0x0e, 0x0d, 0x45, 0x52, 0x4c, 0x84, 0xc0, 0x06, 0x8e, 0x35, 0xcc, 0x3a, 0xa9, + 0x68, 0x66, 0xe7, 0x9c, 0x39, 0x39, 0xdb, 0xe7, 0x91, 0x33, 0x4d, 0xa8, 0x54, 0x5a, 0xd6, 0xb3, + 0xd5, 0xd0, 0x84, 0xca, 0x90, 0x44, 0x4e, 0x3a, 0x17, 0x34, 0xf8, 0x1c, 0xd3, 0x24, 0x21, 0x13, + 0xaa, 0x19, 0x0f, 0x97, 0x18, 0xec, 0x4b, 0x38, 0x51, 0x3f, 0xda, 0x17, 0xb0, 0x37, 0x0c, 0x82, + 0x0f, 0x09, 0x95, 0x63, 0x41, 0x25, 0x49, 0x43, 0xce, 0xd0, 0x00, 0xee, 0x64, 0x86, 0xfd, 0xd6, + 0x51, 0xeb, 0x78, 0xc7, 0x3d, 0xc2, 0x85, 0xf4, 0xca, 0x0d, 0x9b, 0x60, 0x38, 0x23, 0x7a, 0x39, + 0xda, 0x7e, 0x0a, 0x1d, 0x8f, 0xc6, 0x7c, 0x46, 0xcb, 0x62, 0x5d, 0xb8, 0x4b, 0x63, 0x12, 0x46, + 0xb9, 0xda, 0xb6, 0xa7, 0xbe, 0xd8, 0x63, 0x78, 0x30, 0x0c, 0x82, 0xb7, 0xec, 0x9a, 0x4f, 0x59, + 0xe0, 0xd1, 0xaf, 0x53, 0x9a, 0xa4, 0xe8, 0x25, 0x6c, 0x86, 0xea, 0x64, 0x95, 0xb5, 0x06, 0x5f, + 0x10, 0x16, 0x44, 0x54, 0x8e, 0xf2, 0x21, 0x3c, 0x43, 0xb0, 0xbb, 0x80, 0x8a, 0x82, 0x89, 0xe0, + 0x2c, 0xa1, 0xf6, 0x31, 0x74, 0x55, 0xa6, 0x8a, 0xd3, 0x1e, 0xdc, 0x4e, 0xc9, 0x44, 0x47, 0xca, + 0x1e, 0xed, 0x03, 0xe8, 0x55, 0x90, 0x5a, 0x22, 0x86, 0xce, 0x30, 0x4a, 0xa9, 0xac, 0x53, 0x40, + 0x6f, 0x60, 0x9b, 0x9b, 0xa9, 0xfb, 0x1b, 0x79, 0xfe, 0x27, 0x2b, 0x56, 0xa7, 0x2e, 0x0a, 0xbf, + 0xcf, 0x2e, 0xea, 0x9d, 0xba, 0x27, 0xef, 0x86, 0x68, 0xef, 0x43, 0xb7, 0x6c, 0xa7, 0x63, 0x5c, + 0xe5, 0xf3, 0x8d, 0xa7, 0x69, 0x29, 0xc5, 0x2b, 0xd8, 0xe2, 0xfa, 0x48, 0xaf, 0xec, 0x51, 0xd1, + 0xd2, 0xc0, 0xcb, 0x3b, 0x5b, 0x50, 0xec, 0x1e, 0x74, 0x4a, 0xa2, 0xda, 0xeb, 0xc4, 0xec, 0xa2, + 0x6a, 0xb7, 0xbc, 0xb6, 0x3e, 0xec, 0x57, 0xa1, 0x5a, 0x84, 0xe9, 0x41, 0x6a, 0x35, 0xfe, 0xd3, + 0xe2, 0x0e, 0xa0, 0x57, 0xf1, 0xd3, 0x41, 0xb6, 0xa0, 0xad, 0x06, 0x77, 0xff, 0xb4, 0x61, 0x57, + 0xaf, 0xe2, 0x8a, 0xca, 0x59, 0xe8, 0x53, 0xf4, 0x0d, 0xe0, 0xa6, 0x36, 0xc8, 0xc5, 0x35, 0x2f, + 0x2a, 0x5e, 0x2a, 0xad, 0x75, 0xda, 0x88, 0xa3, 0x33, 0xdd, 0x42, 0xbf, 0x5a, 0x70, 0xbf, 0x54, + 0x38, 0x74, 0x56, 0x2b, 0xb4, 0xaa, 0xca, 0xd6, 0x79, 0x53, 0xda, 0x22, 0xc2, 0x4f, 0xb8, 0x57, + 0xac, 0x1a, 0x1a, 0xd4, 0x4f, 0xb2, 0xfc, 0x22, 0x58, 0x67, 0x0d, 0x59, 0x0b, 0xfb, 0x1f, 0xb0, + 0x53, 0x28, 0x1f, 0x5a, 0x6b, 0x8f, 0x95, 0x32, 0x59, 0x83, 0x66, 0xa4, 0x85, 0xf7, 0xef, 0x16, + 0xec, 0x96, 0x7b, 0x8b, 0xd6, 0xdd, 0x63, 0x35, 0xc2, 0x8b, 0xc6, 0xbc, 0x52, 0x07, 0x4a, 0x9d, + 0x45, 0x6b, 0x2e, 0xb3, 0x9a, 0xe1, 0xbc, 0x29, 0xcd, 0x44, 0x78, 0xed, 0xc1, 0x63, 0x9f, 0xc7, + 0x75, 0xf4, 0xcb, 0xd6, 0xa7, 0x4d, 0xfd, 0xf8, 0x77, 0xe3, 0xf0, 0xa3, 0xeb, 0x91, 0x39, 0x1e, + 0x65, 0xe0, 0xa1, 0x10, 0xf8, 0xd2, 0x80, 0x47, 0x0a, 0x71, 0xdd, 0xce, 0xff, 0x1d, 0x4e, 0xff, + 0x05, 0x00, 0x00, 0xff, 0xff, 0x2f, 0x05, 0xaa, 0x44, 0x29, 0x07, 0x00, 0x00, +} diff --git a/app/proxyman/command/command.proto b/app/proxyman/command/command.proto new file mode 100644 index 000000000..c40c8ec25 --- /dev/null +++ b/app/proxyman/command/command.proto @@ -0,0 +1,80 @@ +syntax = "proto3"; + +package v2ray.core.app.proxyman.command; +option csharp_namespace = "V2Ray.Core.App.Proxyman.Command"; +option go_package = "command"; +option java_package = "com.v2ray.core.app.proxyman.command"; +option java_multiple_files = true; + +import "v2ray.com/core/common/protocol/user.proto"; +import "v2ray.com/core/common/serial/typed_message.proto"; +import "v2ray.com/core/config.proto"; + +message AddUserOperation { + v2ray.core.common.protocol.User user = 1; +} + +message RemoveUserOperation { + string email = 1; +} + +message AddInboundRequest { + core.InboundHandlerConfig inbound = 1; +} + +message AddInboundResponse{ + +} + +message RemoveInboundRequest { + string tag = 1; +} + +message RemoveInboundResponse {} + +message AlterInboundRequest { + string tag = 1; + v2ray.core.common.serial.TypedMessage operation = 2; +} + +message AlterInboundResponse { +} + +message AddOutboundRequest { + core.OutboundHandlerConfig outbound = 1; +} + +message AddOutboundResponse { + +} + +message RemoveOutboundRequest { + string tag = 1; +} + +message RemoveOutboundResponse { +} + +message AlterOutboundRequest { + string tag = 1; + v2ray.core.common.serial.TypedMessage operation = 2; +} + +message AlterOutboundResponse { +} + +service HandlerService { + rpc AddInbound(AddInboundRequest) returns (AddInboundResponse) {} + + rpc RemoveInbound(RemoveInboundRequest) returns (RemoveInboundResponse) {} + + rpc AlterInbound(AlterInboundRequest) returns (AlterInboundResponse) {} + + rpc AddOutbound(AddOutboundRequest) returns (AddOutboundResponse) {} + + rpc RemoveOutbound(RemoveOutboundRequest) returns (RemoveOutboundResponse) {} + + rpc AlterOutbound(AlterOutboundRequest) returns (AlterOutboundResponse) {} +} + +message Config {} \ No newline at end of file diff --git a/app/proxyman/command/doc.go b/app/proxyman/command/doc.go new file mode 100644 index 000000000..8c18efaf4 --- /dev/null +++ b/app/proxyman/command/doc.go @@ -0,0 +1,3 @@ +package command + +//go:generate go run $GOPATH/src/v2ray.com/core/common/errors/errorgen/main.go -pkg command -path App,Proxyman,Command diff --git a/app/proxyman/command/errors.generated.go b/app/proxyman/command/errors.generated.go new file mode 100644 index 000000000..3cbe9949a --- /dev/null +++ b/app/proxyman/command/errors.generated.go @@ -0,0 +1,5 @@ +package command + +import "v2ray.com/core/common/errors" + +func newError(values ...interface{}) *errors.Error { return errors.New(values...).Path("App", "Proxyman", "Command") } diff --git a/app/proxyman/inbound/always.go b/app/proxyman/inbound/always.go index 7a49ce592..ed0855743 100644 --- a/app/proxyman/inbound/always.go +++ b/app/proxyman/inbound/always.go @@ -93,3 +93,7 @@ func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, func (h *AlwaysOnInboundHandler) Tag() string { return h.tag } + +func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound { + return h.proxy +} diff --git a/app/proxyman/inbound/inbound.go b/app/proxyman/inbound/inbound.go index f52a4cdae..2cf37dedb 100644 --- a/app/proxyman/inbound/inbound.go +++ b/app/proxyman/inbound/inbound.go @@ -4,6 +4,7 @@ package inbound import ( "context" + "sync" "v2ray.com/core" "v2ray.com/core/app/proxyman" @@ -12,8 +13,9 @@ import ( // Manager is to manage all inbound handlers. type Manager struct { - handlers []core.InboundHandler - taggedHandlers map[string]core.InboundHandler + sync.RWMutex + untaggedHandler []core.InboundHandler + taggedHandlers map[string]core.InboundHandler } func New(ctx context.Context, config *proxyman.InboundConfig) (*Manager, error) { @@ -31,15 +33,22 @@ func New(ctx context.Context, config *proxyman.InboundConfig) (*Manager, error) } func (m *Manager) AddHandler(ctx context.Context, handler core.InboundHandler) error { - m.handlers = append(m.handlers, handler) + m.Lock() + defer m.Unlock() + tag := handler.Tag() if len(tag) > 0 { m.taggedHandlers[tag] = handler + } else { + m.untaggedHandler = append(m.untaggedHandler, handler) } return nil } func (m *Manager) GetHandler(ctx context.Context, tag string) (core.InboundHandler, error) { + m.RLock() + defer m.RUnlock() + handler, found := m.taggedHandlers[tag] if !found { return nil, newError("handler not found: ", tag) @@ -47,8 +56,31 @@ func (m *Manager) GetHandler(ctx context.Context, tag string) (core.InboundHandl return handler, nil } +func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { + if len(tag) == 0 { + return core.ErrNoClue + } + + m.Lock() + defer m.Unlock() + + if handler, found := m.taggedHandlers[tag]; found { + handler.Close() + delete(m.taggedHandlers, tag) + return nil + } + + return core.ErrNoClue +} + func (m *Manager) Start() error { - for _, handler := range m.handlers { + for _, handler := range m.taggedHandlers { + if err := handler.Start(); err != nil { + return err + } + } + + for _, handler := range m.untaggedHandler { if err := handler.Start(); err != nil { return err } @@ -57,7 +89,10 @@ func (m *Manager) Start() error { } func (m *Manager) Close() { - for _, handler := range m.handlers { + for _, handler := range m.taggedHandlers { + handler.Close() + } + for _, handler := range m.untaggedHandler { handler.Close() } } diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index f65ac70e5..bef922d9c 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -105,7 +105,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn ctx = proxy.ContextWithTarget(ctx, dest) stream := ray.NewRay(ctx) go handler.Dispatch(ctx, stream) - return ray.NewConnection(stream, zeroAddr, zeroAddr), nil + return ray.NewConnection(stream.InboundOutput(), stream.InboundInput()), nil } newError("failed to get outbound handler with tag: ", tag).AtWarning().WriteToLog() @@ -122,3 +122,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn return internet.Dial(ctx, dest) } + +func (h *Handler) GetOutbound() proxy.Outbound { + return h.proxy +} diff --git a/app/proxyman/outbound/outbound.go b/app/proxyman/outbound/outbound.go index 6fb68866d..747ebaa7b 100644 --- a/app/proxyman/outbound/outbound.go +++ b/app/proxyman/outbound/outbound.go @@ -73,6 +73,21 @@ func (m *Manager) AddHandler(ctx context.Context, handler core.OutboundHandler) return nil } +func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { + if len(tag) == 0 { + return core.ErrNoClue + } + m.Lock() + defer m.Unlock() + + delete(m.taggedHandler, tag) + if m.defaultHandler.Tag() == tag { + m.defaultHandler = nil + } + + return nil +} + func init() { common.Must(common.RegisterConfig((*proxyman.OutboundConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { return New(ctx, config.(*proxyman.OutboundConfig)) diff --git a/dial.go b/dial.go index aef5d8f2a..1819bcafd 100644 --- a/dial.go +++ b/dial.go @@ -16,5 +16,5 @@ func Dial(ctx context.Context, v *Instance, dest net.Destination) (net.Conn, err if err != nil { return nil, err } - return ray.NewConnection(r, &net.TCPAddr{IP: []byte{0, 0, 0, 0}}, &net.TCPAddr{IP: []byte{0, 0, 0, 0}}), nil + return ray.NewConnection(r.InboundOutput(), r.InboundInput()), nil } diff --git a/network.go b/network.go index 7524f7e4c..1d2f50910 100644 --- a/network.go +++ b/network.go @@ -32,6 +32,9 @@ type InboundHandlerManager interface { GetHandler(ctx context.Context, tag string) (InboundHandler, error) // AddHandler adds the given handler into this InboundHandlerManager. AddHandler(ctx context.Context, handler InboundHandler) error + + // RemoveHandler removes a handler from InboundHandlerManager. + RemoveHandler(ctx context.Context, tag string) error } type syncInboundHandlerManager struct { @@ -97,6 +100,9 @@ type OutboundHandlerManager interface { GetDefaultHandler() OutboundHandler // AddHandler adds a handler into this OutboundHandlerManager. AddHandler(ctx context.Context, handler OutboundHandler) error + + // RemoveHandler removes a handler from OutboundHandlerManager. + RemoveHandler(ctx context.Context, tag string) error } type syncOutboundHandlerManager struct { diff --git a/proxy/proxy.go b/proxy/proxy.go index 50162378e..1f7de5b56 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -12,6 +12,7 @@ import ( "v2ray.com/core" "v2ray.com/core/common/net" + "v2ray.com/core/common/protocol" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/ray" ) @@ -36,3 +37,20 @@ type Dialer interface { // Dial dials a system connection to the given destination. Dial(ctx context.Context, destination net.Destination) (internet.Connection, error) } + +// UserManager is the interface for Inbounds and Outbounds that can manage their users. +type UserManager interface { + // AddUser adds a new user. + AddUser(context.Context, *protocol.User) error + + // RemoveUser removes an user by email. + RemoveUser(context.Context, string) error +} + +type GetInbound interface { + GetInbound() Inbound +} + +type GetOutbound interface { + GetOutbound() Outbound +} \ No newline at end of file diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 9949383a2..a7b358d27 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -124,6 +124,10 @@ func (h *Handler) AddUser(ctx context.Context, user *protocol.User) error { return h.clients.Add(user) } +func (h *Handler) RemoveUser(ctx context.Context, email string) error { + return newError("not implemented") +} + func transferRequest(timer signal.ActivityUpdater, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error { defer output.Close() diff --git a/transport/ray/connection.go b/transport/ray/connection.go index 4a02443ff..2bc4d39b9 100644 --- a/transport/ray/connection.go +++ b/transport/ray/connection.go @@ -6,29 +6,59 @@ import ( "time" "v2ray.com/core/common/buf" + "v2ray.com/core/common/signal" ) -type connection struct { - stream InboundRay - closed bool - localAddr net.Addr - remoteAddr net.Addr +type ConnectionOption func(*connection) - reader *buf.BufferedReader - writer buf.Writer +func ConnLocalAddr(addr net.Addr) ConnectionOption { + return func(c *connection) { + c.localAddr = addr + } } -// NewConnection wraps a Ray into net.Conn. -func NewConnection(stream InboundRay, localAddr net.Addr, remoteAddr net.Addr) net.Conn { - return &connection{ - stream: stream, - localAddr: localAddr, - remoteAddr: remoteAddr, - reader: buf.NewBufferedReader(stream.InboundOutput()), - writer: stream.InboundInput(), +func ConnRemoteAddr(addr net.Addr) ConnectionOption { + return func(c *connection) { + c.remoteAddr = addr } } +func ConnCloseSignal(s *signal.Notifier) ConnectionOption { + return func(c *connection) { + c.closeSignal = s + } +} + +type connection struct { + input InputStream + output OutputStream + closed bool + localAddr net.Addr + remoteAddr net.Addr + closeSignal *signal.Notifier + + reader *buf.BufferedReader +} + +var zeroAddr net.Addr = &net.TCPAddr{IP: []byte{0, 0, 0, 0}} + +// NewConnection wraps a Ray into net.Conn. +func NewConnection(input InputStream, output OutputStream, options ...ConnectionOption) net.Conn { + c := &connection{ + input: input, + output: output, + localAddr: zeroAddr, + remoteAddr: zeroAddr, + reader: buf.NewBufferedReader(input), + } + + for _, opt := range options { + opt(c) + } + + return c +} + // Read implements net.Conn.Read(). func (c *connection) Read(b []byte) (int, error) { if c.closed { @@ -51,7 +81,7 @@ func (c *connection) Write(b []byte) (int, error) { l := len(b) mb := buf.NewMultiBufferCap(l/buf.Size + 1) mb.Write(b) - return l, c.writer.WriteMultiBuffer(mb) + return l, c.output.WriteMultiBuffer(mb) } func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error { @@ -59,14 +89,17 @@ func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error { return io.ErrClosedPipe } - return c.writer.WriteMultiBuffer(mb) + return c.output.WriteMultiBuffer(mb) } // Close implements net.Conn.Close(). func (c *connection) Close() error { c.closed = true - c.stream.InboundInput().Close() - c.stream.InboundOutput().CloseError() + c.output.Close() + c.input.CloseError() + if c.closeSignal != nil { + c.closeSignal.Signal() + } return nil } diff --git a/v2ray.go b/v2ray.go index 542deef8a..18de4e728 100644 --- a/v2ray.go +++ b/v2ray.go @@ -46,20 +46,18 @@ func New(config *Config) (*Instance, error) { return nil, err } - ctx := context.WithValue(context.Background(), v2rayKey, server) - for _, appSettings := range config.App { settings, err := appSettings.GetInstance() if err != nil { return nil, err } - if _, err := common.CreateObject(ctx, settings); err != nil { + if _, err := server.CreateObject(settings); err != nil { return nil, err } } for _, inbound := range config.Inbound { - rawHandler, err := common.CreateObject(ctx, inbound) + rawHandler, err := server.CreateObject(inbound) if err != nil { return nil, err } @@ -67,13 +65,13 @@ func New(config *Config) (*Instance, error) { if !ok { return nil, newError("not an InboundHandler") } - if err := server.InboundHandlerManager().AddHandler(ctx, handler); err != nil { + if err := server.InboundHandlerManager().AddHandler(context.Background(), handler); err != nil { return nil, err } } for _, outbound := range config.Outbound { - rawHandler, err := common.CreateObject(ctx, outbound) + rawHandler, err := server.CreateObject(outbound) if err != nil { return nil, err } @@ -81,7 +79,7 @@ func New(config *Config) (*Instance, error) { if !ok { return nil, newError("not an OutboundHandler") } - if err := server.OutboundHandlerManager().AddHandler(ctx, handler); err != nil { + if err := server.OutboundHandlerManager().AddHandler(context.Background(), handler); err != nil { return nil, err } } @@ -89,6 +87,11 @@ func New(config *Config) (*Instance, error) { return server, nil } +func (s *Instance) CreateObject(config interface{}) (interface{}, error) { + ctx := context.WithValue(context.Background(), v2rayKey, s) + return common.CreateObject(ctx, config) +} + // ID returns an unique ID for this V2Ray instance. func (s *Instance) ID() uuid.UUID { return s.id