From 6f913b31e229b996ffa7616b2d2e37a117a79aad Mon Sep 17 00:00:00 2001 From: Shelikhoo Date: Sat, 6 Mar 2021 13:33:20 +0000 Subject: [PATCH] structure for observatory --- app/observatory/command/command.proto | 24 ++ app/observatory/config.pb.go | 324 ++++++++++++++++++++++++++ app/observatory/config.proto | 51 ++++ app/observatory/errors.generated.go | 9 + app/observatory/observatory.go | 3 + app/observatory/observer.go | 53 +++++ features/extension/observatory.go | 17 ++ 7 files changed, 481 insertions(+) create mode 100644 app/observatory/command/command.proto create mode 100644 app/observatory/config.pb.go create mode 100644 app/observatory/config.proto create mode 100644 app/observatory/errors.generated.go create mode 100644 app/observatory/observatory.go create mode 100644 app/observatory/observer.go create mode 100644 features/extension/observatory.go diff --git a/app/observatory/command/command.proto b/app/observatory/command/command.proto new file mode 100644 index 000000000..74c71269a --- /dev/null +++ b/app/observatory/command/command.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package v2ray.core.app.observatory.command; +option csharp_namespace = "V2Ray.Core.App.Observatory.Command"; +option go_package = "github.com/v2fly/v2ray-core/v4/app/observatory/command"; +option java_package = "com.v2ray.core.app.observatory.command"; +option java_multiple_files = true; + +import "app/observatory/config.proto"; + +message GetOutboundStatusRequest { +} + +message GetOutboundStatusResponse { + v2ray.core.app.observatory.ObservationResult status = 1; +} + +service ObservatoryService { + rpc GetOutboundStatus(GetOutboundStatusRequest) + returns (GetOutboundStatusResponse) {} +} + + +message Config {} \ No newline at end of file diff --git a/app/observatory/config.pb.go b/app/observatory/config.pb.go new file mode 100644 index 000000000..693ab42e1 --- /dev/null +++ b/app/observatory/config.pb.go @@ -0,0 +1,324 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.13.0 +// source: app/observatory/config.proto + +package observatory + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type OutboundStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // @Document Whether this outbound is usable + //@Restriction ReadOnlyForUser + Alive bool `protobuf:"varint,1,opt,name=alive,proto3" json:"alive,omitempty"` + // @Document The time for probe request to finish. + //@Type time.ms + //@Restriction ReadOnlyForUser + Delay uint32 `protobuf:"varint,2,opt,name=delay,proto3" json:"delay,omitempty"` + // @Document The last error caused this outbound failed to relay probe request + //@Restriction NotMachineReadable + LastErrorReason string `protobuf:"bytes,3,opt,name=last_error_reason,json=lastErrorReason,proto3" json:"last_error_reason,omitempty"` + // @Document The outbound tag for this Server + //@Type id.outboundTag + OutboundTag string `protobuf:"bytes,4,opt,name=outbound_tag,json=outboundTag,proto3" json:"outbound_tag,omitempty"` +} + +func (x *OutboundStatus) Reset() { + *x = OutboundStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *OutboundStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OutboundStatus) ProtoMessage() {} + +func (x *OutboundStatus) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OutboundStatus.ProtoReflect.Descriptor instead. +func (*OutboundStatus) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{0} +} + +func (x *OutboundStatus) GetAlive() bool { + if x != nil { + return x.Alive + } + return false +} + +func (x *OutboundStatus) GetDelay() uint32 { + if x != nil { + return x.Delay + } + return 0 +} + +func (x *OutboundStatus) GetLastErrorReason() string { + if x != nil { + return x.LastErrorReason + } + return "" +} + +func (x *OutboundStatus) GetOutboundTag() string { + if x != nil { + return x.OutboundTag + } + return "" +} + +type Intensity struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // @Document The time interval for a probe request in ms. + //@Type time.ms + ProbeInterval uint32 `protobuf:"varint,1,opt,name=probe_interval,json=probeInterval,proto3" json:"probe_interval,omitempty"` +} + +func (x *Intensity) Reset() { + *x = Intensity{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Intensity) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Intensity) ProtoMessage() {} + +func (x *Intensity) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Intensity.ProtoReflect.Descriptor instead. +func (*Intensity) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{1} +} + +func (x *Intensity) GetProbeInterval() uint32 { + if x != nil { + return x.ProbeInterval + } + return 0 +} + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // @Document The selectors for outbound under observation + SubjectSelector []string `protobuf:"bytes,2,rep,name=subject_selector,json=subjectSelector,proto3" json:"subject_selector,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{2} +} + +func (x *Config) GetSubjectSelector() []string { + if x != nil { + return x.SubjectSelector + } + return nil +} + +var File_app_observatory_config_proto protoreflect.FileDescriptor + +var file_app_observatory_config_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, + 0x79, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1a, + 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, + 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x22, 0x8b, 0x01, 0x0a, 0x0e, 0x4f, + 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, + 0x05, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x61, 0x6c, + 0x69, 0x76, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x2a, 0x0a, 0x11, 0x6c, 0x61, 0x73, + 0x74, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x6c, 0x61, 0x73, 0x74, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, + 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, + 0x64, 0x5f, 0x74, 0x61, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6f, 0x75, 0x74, + 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x22, 0x32, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, + 0x6e, 0x73, 0x69, 0x74, 0x79, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x5f, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x70, + 0x72, 0x6f, 0x62, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x22, 0x33, 0x0a, 0x06, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x29, 0x0a, 0x10, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, + 0x74, 0x5f, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x0f, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x42, 0x6f, 0x0a, 0x1e, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, + 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, + 0x6f, 0x72, 0x79, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, + 0x72, 0x65, 0x2f, 0x76, 0x34, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, + 0x61, 0x74, 0x6f, 0x72, 0x79, 0xaa, 0x02, 0x1a, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e, 0x43, 0x6f, + 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, + 0x72, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_app_observatory_config_proto_rawDescOnce sync.Once + file_app_observatory_config_proto_rawDescData = file_app_observatory_config_proto_rawDesc +) + +func file_app_observatory_config_proto_rawDescGZIP() []byte { + file_app_observatory_config_proto_rawDescOnce.Do(func() { + file_app_observatory_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_observatory_config_proto_rawDescData) + }) + return file_app_observatory_config_proto_rawDescData +} + +var file_app_observatory_config_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_app_observatory_config_proto_goTypes = []interface{}{ + (*OutboundStatus)(nil), // 0: v2ray.core.app.observatory.OutboundStatus + (*Intensity)(nil), // 1: v2ray.core.app.observatory.Intensity + (*Config)(nil), // 2: v2ray.core.app.observatory.Config +} +var file_app_observatory_config_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_app_observatory_config_proto_init() } +func file_app_observatory_config_proto_init() { + if File_app_observatory_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_app_observatory_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*OutboundStatus); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Intensity); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_config_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_app_observatory_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_app_observatory_config_proto_goTypes, + DependencyIndexes: file_app_observatory_config_proto_depIdxs, + MessageInfos: file_app_observatory_config_proto_msgTypes, + }.Build() + File_app_observatory_config_proto = out.File + file_app_observatory_config_proto_rawDesc = nil + file_app_observatory_config_proto_goTypes = nil + file_app_observatory_config_proto_depIdxs = nil +} diff --git a/app/observatory/config.proto b/app/observatory/config.proto new file mode 100644 index 000000000..b06e96d91 --- /dev/null +++ b/app/observatory/config.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package v2ray.core.app.observatory; +option csharp_namespace = "V2Ray.Core.App.Observatory"; +option go_package = "github.com/v2fly/v2ray-core/v4/app/observatory"; +option java_package = "com.v2ray.core.app.observatory"; +option java_multiple_files = true; + +message ObservationResult { + repeated OutboundStatus status = 1; +} + +message OutboundStatus{ + /* @Document Whether this outbound is usable + @Restriction ReadOnlyForUser + */ + bool alive = 1; + /* @Document The time for probe request to finish. + @Type time.ms + @Restriction ReadOnlyForUser + */ + uint32 delay = 2; + /* @Document The last error caused this outbound failed to relay probe request + @Restriction NotMachineReadable + */ + string last_error_reason = 3; + /* @Document The outbound tag for this Server + @Type id.outboundTag + */ + string outbound_tag = 4; + /* @Document The time this outbound is known to be alive + @Type id.outboundTag +*/ + int64 last_seen_time = 5; + /* @Document The time this outbound is tried + @Type id.outboundTag +*/ + int64 last_try_time = 6; +} + +message Intensity{ + /* @Document The time interval for a probe request in ms. + @Type time.ms + */ + uint32 probe_interval = 1; +} +message Config { + /* @Document The selectors for outbound under observation + */ + repeated string subject_selector = 2; +} \ No newline at end of file diff --git a/app/observatory/errors.generated.go b/app/observatory/errors.generated.go new file mode 100644 index 000000000..05169e048 --- /dev/null +++ b/app/observatory/errors.generated.go @@ -0,0 +1,9 @@ +package observatory + +import "github.com/v2fly/v2ray-core/v4/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/app/observatory/observatory.go b/app/observatory/observatory.go new file mode 100644 index 000000000..937bcf96f --- /dev/null +++ b/app/observatory/observatory.go @@ -0,0 +1,3 @@ +package observatory + +//go:generate go run github.com/v2fly/v2ray-core/v4/common/errors/errorgen diff --git a/app/observatory/observer.go b/app/observatory/observer.go new file mode 100644 index 000000000..ca069387b --- /dev/null +++ b/app/observatory/observer.go @@ -0,0 +1,53 @@ +package observatory + +import ( + "context" + "github.com/v2fly/v2ray-core/v4/common/signal/done" + "github.com/v2fly/v2ray-core/v4/features/extension" + "github.com/v2fly/v2ray-core/v4/features/outbound" + "sync" +) + +type Observer struct { + config *Config + ctx context.Context + + statusLock sync.Mutex + status []OutboundStatus + + finished *done.Instance + + ohm outbound.Manager +} + +func (o *Observer) Type() interface{} { + return extension.ObservatoryType() +} + +func (o *Observer) Start() error { + o.finished = done.New() + go o.background() + return nil +} + +func (o *Observer) Close() error { + return o.finished.Close() +} + +func (o *Observer) background() { + for !o.finished.Done() { + hs, ok := o.ohm.(outbound.HandlerSelector) + if !ok { + newError("outbound.Manager is not a HandlerSelector").WriteToLog() + return + } + outbounds := hs.Select(o.config.SubjectSelector) + + } +} +func (o *Observer) updateStatus(outbounds []string) { + o.statusLock.Lock() + defer o.statusLock.Unlock() + + o.status +} diff --git a/features/extension/observatory.go b/features/extension/observatory.go new file mode 100644 index 000000000..ca3bc751c --- /dev/null +++ b/features/extension/observatory.go @@ -0,0 +1,17 @@ +package extension + +import ( + "context" + "github.com/golang/protobuf/proto" + "github.com/v2fly/v2ray-core/v4/features" +) + +type Observatory interface { + features.Feature + + GetObservation(ctx context.Context) (proto.Message, error) +} + +func ObservatoryType() interface{} { + return (*Observatory)(nil) +}