diff --git a/main/distro/all/all.go b/main/distro/all/all.go index efe8042cb..32a6489d5 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -65,6 +65,12 @@ import ( _ "github.com/v2fly/v2ray-core/v5/transport/internet/udp" _ "github.com/v2fly/v2ray-core/v5/transport/internet/websocket" + // Developer preview transports + _ "github.com/v2fly/v2ray-core/v5/transport/internet/request/assembly" + + _ "github.com/v2fly/v2ray-core/v5/transport/internet/request/assembler/simple" + _ "github.com/v2fly/v2ray-core/v5/transport/internet/request/roundtripper/httprt" + // Transport headers _ "github.com/v2fly/v2ray-core/v5/transport/internet/headers/http" _ "github.com/v2fly/v2ray-core/v5/transport/internet/headers/noop" diff --git a/transport/internet/request/assembler.go b/transport/internet/request/assembler.go new file mode 100644 index 000000000..58e1e01f4 --- /dev/null +++ b/transport/internet/request/assembler.go @@ -0,0 +1,22 @@ +package request + +import ( + "io" +) + +type SessionAssemblerClient interface { + SessionCreator + TransportClientAssemblyReceiver +} +type SessionAssemblerServer interface { + TripperReceiver + TransportServerAssemblyReceiver +} + +type SessionOption interface { + RoundTripperOption() +} + +type Session interface { + io.ReadWriteCloser +} diff --git a/transport/internet/request/assembler/simple/client.go b/transport/internet/request/assembler/simple/client.go new file mode 100644 index 000000000..770fc69c3 --- /dev/null +++ b/transport/internet/request/assembler/simple/client.go @@ -0,0 +1,173 @@ +package simple + +import ( + "bytes" + "context" + "crypto/rand" + "io" + "time" + + "github.com/v2fly/v2ray-core/v5/common" + + "github.com/v2fly/v2ray-core/v5/transport/internet/request" +) + +func newClient(config *ClientConfig) request.SessionAssemblerClient { + return &simpleAssemblerClient{config: config} +} + +type simpleAssemblerClient struct { + assembly request.TransportClientAssembly + config *ClientConfig +} + +func (s *simpleAssemblerClient) OnTransportClientAssemblyReady(assembly request.TransportClientAssembly) { + s.assembly = assembly +} + +func (s *simpleAssemblerClient) NewSession(ctx context.Context, opts ...request.SessionOption) (request.Session, error) { + sessionID := make([]byte, 16) + _, err := io.ReadFull(rand.Reader, sessionID) + if err != nil { + return nil, err + } + sessionContext, finish := context.WithCancel(ctx) + session := &simpleAssemblerClientSession{ + sessionID: sessionID, tripper: s.assembly.Tripper(), readBuffer: bytes.NewBuffer(nil), + ctx: sessionContext, finish: finish, writerChan: make(chan []byte), readerChan: make(chan []byte, 16), assembler: s} + go session.keepRunning() + return session, nil +} + +type simpleAssemblerClientSession struct { + sessionID []byte + currentWriteWait int + + assembler *simpleAssemblerClient + tripper request.Tripper + readBuffer *bytes.Buffer + writerChan chan []byte + readerChan chan []byte + ctx context.Context + finish func() +} + +func (s *simpleAssemblerClientSession) keepRunning() { + s.currentWriteWait = int(s.assembler.config.InitialPollingIntervalMs) + for s.ctx.Err() == nil { + s.runOnce() + } +} + +func (s *simpleAssemblerClientSession) runOnce() { + sendBuffer := bytes.NewBuffer(nil) + if s.currentWriteWait != 0 { + waitTimer := time.NewTimer(time.Millisecond * time.Duration(s.currentWriteWait)) + waitForFirstWrite := true + copyFromWriterLoop: + for { + select { + case <-s.ctx.Done(): + return + case data := <-s.writerChan: + sendBuffer.Write(data) + if sendBuffer.Len() >= int(s.assembler.config.MaxWriteSize) { + break copyFromWriterLoop + } + if waitForFirstWrite { + waitForFirstWrite = false + waitTimer.Reset(time.Millisecond * time.Duration(s.assembler.config.WaitSubsequentWriteMs)) + } + case <-waitTimer.C: + break copyFromWriterLoop + } + } + waitTimer.Stop() + } + + firstRound := true + pollConnection := true + for sendBuffer.Len() != 0 || firstRound { + firstRound = false + sendAmount := sendBuffer.Len() + if sendAmount > int(s.assembler.config.MaxWriteSize) { + sendAmount = int(s.assembler.config.MaxWriteSize) + } + data := sendBuffer.Next(sendAmount) + if len(data) != 0 { + pollConnection = false + } + for { + resp, err := s.tripper.RoundTrip(s.ctx, request.Request{Data: data, ConnectionTag: s.sessionID}) + if err != nil { + newError("failed to send data").Base(err).WriteToLog() + if s.ctx.Err() != nil { + return + } + time.Sleep(time.Millisecond * time.Duration(s.assembler.config.FailedRetryIntervalMs)) + continue + } + if len(resp.Data) != 0 { + s.readerChan <- resp.Data + } + if len(resp.Data) != 0 { + pollConnection = false + } + break + } + } + if pollConnection { + s.currentWriteWait = int(s.assembler.config.BackoffFactor * float32(s.currentWriteWait)) + if s.currentWriteWait > int(s.assembler.config.MaxPollingIntervalMs) { + s.currentWriteWait = int(s.assembler.config.MaxPollingIntervalMs) + } + if s.currentWriteWait < int(s.assembler.config.MinPollingIntervalMs) { + s.currentWriteWait = int(s.assembler.config.MinPollingIntervalMs) + } + } else { + s.currentWriteWait = int(0) + } +} + +func (s *simpleAssemblerClientSession) Read(p []byte) (n int, err error) { + if s.readBuffer.Len() == 0 { + select { + case <-s.ctx.Done(): + return 0, s.ctx.Err() + case data := <-s.readerChan: + s.readBuffer.Write(data) + } + } + n, err = s.readBuffer.Read(p) + if err == io.EOF { + s.readBuffer.Reset() + return 0, nil + } + return +} + +func (s *simpleAssemblerClientSession) Write(p []byte) (n int, err error) { + buf := make([]byte, len(p)) + copy(buf, p) + select { + case <-s.ctx.Done(): + return 0, s.ctx.Err() + case s.writerChan <- buf: + return len(p), nil + } +} + +func (s *simpleAssemblerClientSession) Close() error { + s.finish() + return nil +} + +func init() { + common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + clientConfig, ok := config.(*ClientConfig) + if !ok { + return nil, newError("not a ClientConfig") + } + return newClient(clientConfig), nil + })) +} diff --git a/transport/internet/request/assembler/simple/config.pb.go b/transport/internet/request/assembler/simple/config.pb.go new file mode 100644 index 000000000..cb9daf866 --- /dev/null +++ b/transport/internet/request/assembler/simple/config.pb.go @@ -0,0 +1,295 @@ +package simple + +import ( + _ "github.com/v2fly/v2ray-core/v5/common/protoext" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ClientConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MaxWriteSize int32 `protobuf:"varint,1,opt,name=max_write_size,json=maxWriteSize,proto3" json:"max_write_size,omitempty"` + WaitSubsequentWriteMs int32 `protobuf:"varint,2,opt,name=wait_subsequent_write_ms,json=waitSubsequentWriteMs,proto3" json:"wait_subsequent_write_ms,omitempty"` + InitialPollingIntervalMs int32 `protobuf:"varint,3,opt,name=initial_polling_interval_ms,json=initialPollingIntervalMs,proto3" json:"initial_polling_interval_ms,omitempty"` + MaxPollingIntervalMs int32 `protobuf:"varint,4,opt,name=max_polling_interval_ms,json=maxPollingIntervalMs,proto3" json:"max_polling_interval_ms,omitempty"` + MinPollingIntervalMs int32 `protobuf:"varint,5,opt,name=min_polling_interval_ms,json=minPollingIntervalMs,proto3" json:"min_polling_interval_ms,omitempty"` + BackoffFactor float32 `protobuf:"fixed32,6,opt,name=backoff_factor,json=backoffFactor,proto3" json:"backoff_factor,omitempty"` + FailedRetryIntervalMs int32 `protobuf:"varint,7,opt,name=failed_retry_interval_ms,json=failedRetryIntervalMs,proto3" json:"failed_retry_interval_ms,omitempty"` +} + +func (x *ClientConfig) Reset() { + *x = ClientConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_request_assembler_simple_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ClientConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClientConfig) ProtoMessage() {} + +func (x *ClientConfig) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_request_assembler_simple_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClientConfig.ProtoReflect.Descriptor instead. +func (*ClientConfig) Descriptor() ([]byte, []int) { + return file_transport_internet_request_assembler_simple_config_proto_rawDescGZIP(), []int{0} +} + +func (x *ClientConfig) GetMaxWriteSize() int32 { + if x != nil { + return x.MaxWriteSize + } + return 0 +} + +func (x *ClientConfig) GetWaitSubsequentWriteMs() int32 { + if x != nil { + return x.WaitSubsequentWriteMs + } + return 0 +} + +func (x *ClientConfig) GetInitialPollingIntervalMs() int32 { + if x != nil { + return x.InitialPollingIntervalMs + } + return 0 +} + +func (x *ClientConfig) GetMaxPollingIntervalMs() int32 { + if x != nil { + return x.MaxPollingIntervalMs + } + return 0 +} + +func (x *ClientConfig) GetMinPollingIntervalMs() int32 { + if x != nil { + return x.MinPollingIntervalMs + } + return 0 +} + +func (x *ClientConfig) GetBackoffFactor() float32 { + if x != nil { + return x.BackoffFactor + } + return 0 +} + +func (x *ClientConfig) GetFailedRetryIntervalMs() int32 { + if x != nil { + return x.FailedRetryIntervalMs + } + return 0 +} + +type ServerConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MaxWriteSize int32 `protobuf:"varint,1,opt,name=max_write_size,json=maxWriteSize,proto3" json:"max_write_size,omitempty"` +} + +func (x *ServerConfig) Reset() { + *x = ServerConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_request_assembler_simple_config_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerConfig) ProtoMessage() {} + +func (x *ServerConfig) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_request_assembler_simple_config_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerConfig.ProtoReflect.Descriptor instead. +func (*ServerConfig) Descriptor() ([]byte, []int) { + return file_transport_internet_request_assembler_simple_config_proto_rawDescGZIP(), []int{1} +} + +func (x *ServerConfig) GetMaxWriteSize() int32 { + if x != nil { + return x.MaxWriteSize + } + return 0 +} + +var File_transport_internet_request_assembler_simple_config_proto protoreflect.FileDescriptor + +var file_transport_internet_request_assembler_simple_config_proto_rawDesc = []byte{ + 0x0a, 0x38, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2f, 0x61, 0x73, 0x73, + 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x72, 0x2f, 0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x2f, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x36, 0x76, 0x32, 0x72, 0x61, + 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, + 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x69, 0x6d, 0x70, + 0x6c, 0x65, 0x1a, 0x20, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x65, 0x78, 0x74, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xac, 0x03, 0x0a, 0x0c, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x5f, 0x77, 0x72, 0x69, + 0x74, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x6d, + 0x61, 0x78, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x77, + 0x61, 0x69, 0x74, 0x5f, 0x73, 0x75, 0x62, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x74, 0x5f, 0x77, + 0x72, 0x69, 0x74, 0x65, 0x5f, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x77, + 0x61, 0x69, 0x74, 0x53, 0x75, 0x62, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x74, 0x57, 0x72, 0x69, + 0x74, 0x65, 0x4d, 0x73, 0x12, 0x3d, 0x0a, 0x1b, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, 0x5f, + 0x70, 0x6f, 0x6c, 0x6c, 0x69, 0x6e, 0x67, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, + 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x18, 0x69, 0x6e, 0x69, 0x74, 0x69, + 0x61, 0x6c, 0x50, 0x6f, 0x6c, 0x6c, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, + 0x6c, 0x4d, 0x73, 0x12, 0x35, 0x0a, 0x17, 0x6d, 0x61, 0x78, 0x5f, 0x70, 0x6f, 0x6c, 0x6c, 0x69, + 0x6e, 0x67, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x5f, 0x6d, 0x73, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, 0x6d, 0x61, 0x78, 0x50, 0x6f, 0x6c, 0x6c, 0x69, 0x6e, 0x67, + 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4d, 0x73, 0x12, 0x35, 0x0a, 0x17, 0x6d, 0x69, + 0x6e, 0x5f, 0x70, 0x6f, 0x6c, 0x6c, 0x69, 0x6e, 0x67, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, + 0x61, 0x6c, 0x5f, 0x6d, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, 0x6d, 0x69, 0x6e, + 0x50, 0x6f, 0x6c, 0x6c, 0x69, 0x6e, 0x67, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4d, + 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x62, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x5f, 0x66, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x02, 0x52, 0x0d, 0x62, 0x61, 0x63, 0x6b, 0x6f, + 0x66, 0x66, 0x46, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x37, 0x0a, 0x18, 0x66, 0x61, 0x69, 0x6c, + 0x65, 0x64, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, + 0x6c, 0x5f, 0x6d, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x15, 0x66, 0x61, 0x69, 0x6c, + 0x65, 0x64, 0x52, 0x65, 0x74, 0x72, 0x79, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4d, + 0x73, 0x3a, 0x30, 0x82, 0xb5, 0x18, 0x2c, 0x0a, 0x22, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, + 0x72, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x6d, + 0x62, 0x6c, 0x65, 0x72, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x06, 0x73, 0x69, 0x6d, + 0x70, 0x6c, 0x65, 0x22, 0x66, 0x0a, 0x0c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x5f, 0x77, 0x72, 0x69, 0x74, 0x65, + 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x6d, 0x61, 0x78, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x3a, 0x30, 0x82, 0xb5, 0x18, 0x2c, 0x0a, + 0x22, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x12, 0x06, 0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x42, 0xc3, 0x01, 0x0a, 0x3a, + 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, + 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, + 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x50, 0x01, 0x5a, 0x4a, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, + 0x32, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2f, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, + 0x72, 0x2f, 0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0xaa, 0x02, 0x36, 0x56, 0x32, 0x52, 0x61, 0x79, + 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, + 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x41, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x72, 0x2e, 0x53, 0x69, 0x6d, 0x70, 0x6c, + 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_transport_internet_request_assembler_simple_config_proto_rawDescOnce sync.Once + file_transport_internet_request_assembler_simple_config_proto_rawDescData = file_transport_internet_request_assembler_simple_config_proto_rawDesc +) + +func file_transport_internet_request_assembler_simple_config_proto_rawDescGZIP() []byte { + file_transport_internet_request_assembler_simple_config_proto_rawDescOnce.Do(func() { + file_transport_internet_request_assembler_simple_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_internet_request_assembler_simple_config_proto_rawDescData) + }) + return file_transport_internet_request_assembler_simple_config_proto_rawDescData +} + +var file_transport_internet_request_assembler_simple_config_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_transport_internet_request_assembler_simple_config_proto_goTypes = []interface{}{ + (*ClientConfig)(nil), // 0: v2ray.core.transport.internet.request.assembler.simple.ClientConfig + (*ServerConfig)(nil), // 1: v2ray.core.transport.internet.request.assembler.simple.ServerConfig +} +var file_transport_internet_request_assembler_simple_config_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_transport_internet_request_assembler_simple_config_proto_init() } +func file_transport_internet_request_assembler_simple_config_proto_init() { + if File_transport_internet_request_assembler_simple_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_transport_internet_request_assembler_simple_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ClientConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_transport_internet_request_assembler_simple_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_transport_internet_request_assembler_simple_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_transport_internet_request_assembler_simple_config_proto_goTypes, + DependencyIndexes: file_transport_internet_request_assembler_simple_config_proto_depIdxs, + MessageInfos: file_transport_internet_request_assembler_simple_config_proto_msgTypes, + }.Build() + File_transport_internet_request_assembler_simple_config_proto = out.File + file_transport_internet_request_assembler_simple_config_proto_rawDesc = nil + file_transport_internet_request_assembler_simple_config_proto_goTypes = nil + file_transport_internet_request_assembler_simple_config_proto_depIdxs = nil +} diff --git a/transport/internet/request/assembler/simple/config.proto b/transport/internet/request/assembler/simple/config.proto new file mode 100644 index 000000000..c48b11779 --- /dev/null +++ b/transport/internet/request/assembler/simple/config.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package v2ray.core.transport.internet.request.assembler.simple; +option csharp_namespace = "V2Ray.Core.Transport.Internet.Request.Assembler.Simple"; +option go_package = "github.com/v2fly/v2ray-core/v5/transport/internet/request/assembler/simple"; +option java_package = "com.v2ray.core.transport.internet.request.assembler.simple"; +option java_multiple_files = true; + +import "common/protoext/extensions.proto"; + +message ClientConfig { + option (v2ray.core.common.protoext.message_opt).type = "transport.request.assembler.client"; + option (v2ray.core.common.protoext.message_opt).short_name = "simple"; + + int32 max_write_size = 1; + int32 wait_subsequent_write_ms = 2; + int32 initial_polling_interval_ms = 3; + int32 max_polling_interval_ms = 4; + int32 min_polling_interval_ms = 5; + float backoff_factor = 6; + int32 failed_retry_interval_ms = 7; +} + +message ServerConfig { + option (v2ray.core.common.protoext.message_opt).type = "transport.request.assembler.server"; + option (v2ray.core.common.protoext.message_opt).short_name = "simple"; + + int32 max_write_size = 1; +} \ No newline at end of file diff --git a/transport/internet/request/assembler/simple/errors.generated.go b/transport/internet/request/assembler/simple/errors.generated.go new file mode 100644 index 000000000..7ca71c8e0 --- /dev/null +++ b/transport/internet/request/assembler/simple/errors.generated.go @@ -0,0 +1,9 @@ +package simple + +import "github.com/v2fly/v2ray-core/v5/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/request/assembler/simple/server.go b/transport/internet/request/assembler/simple/server.go new file mode 100644 index 000000000..696e73234 --- /dev/null +++ b/transport/internet/request/assembler/simple/server.go @@ -0,0 +1,137 @@ +package simple + +import ( + "bytes" + "context" + "sync" + + "github.com/v2fly/v2ray-core/v5/common" + + "github.com/v2fly/v2ray-core/v5/transport/internet/request" +) + +func newServer(config *ServerConfig) request.SessionAssemblerServer { + return &simpleAssemblerServer{} +} + +type simpleAssemblerServer struct { + sessions sync.Map + assembly request.TransportServerAssembly +} + +func (s *simpleAssemblerServer) OnTransportServerAssemblyReady(assembly request.TransportServerAssembly) { + s.assembly = assembly +} + +func (s *simpleAssemblerServer) OnRoundTrip(ctx context.Context, req request.Request, opts ...request.RoundTripperOption, +) (resp request.Response, err error) { + connectionID := req.ConnectionTag + session := newSimpleAssemblerServerSession(ctx) + loadedSession, loaded := s.sessions.LoadOrStore(string(connectionID), session) + if loaded { + session = loadedSession.(*simpleAssemblerServerSession) + } else { + if err := s.assembly.SessionReceiver().OnNewSession(ctx, session); err != nil { + return request.Response{}, newError("failed to create new session").Base(err) + } + } + return session.OnRoundTrip(ctx, req, opts...) +} + +func newSimpleAssemblerServerSession(ctx context.Context) *simpleAssemblerServerSession { + sessionCtx, finish := context.WithCancel(ctx) + return &simpleAssemblerServerSession{ + readBuffer: bytes.NewBuffer(nil), + readChan: make(chan []byte, 16), + requestProcessed: make(chan struct{}), + writeLock: new(sync.Mutex), + writeBuffer: bytes.NewBuffer(nil), + maxWriteSize: 4096, + ctx: sessionCtx, + finish: finish, + } +} + +type simpleAssemblerServerSession struct { + maxWriteSize int + + readBuffer *bytes.Buffer + readChan chan []byte + requestProcessed chan struct{} + + writeLock *sync.Mutex + writeBuffer *bytes.Buffer + + ctx context.Context + finish func() +} + +func (s *simpleAssemblerServerSession) Read(p []byte) (n int, err error) { + if s.readBuffer.Len() == 0 { + select { + case <-s.ctx.Done(): + return 0, s.ctx.Err() + case data := <-s.readChan: + s.readBuffer.Write(data) + } + } + return s.readBuffer.Read(p) +} + +func (s *simpleAssemblerServerSession) Write(p []byte) (n int, err error) { + s.writeLock.Lock() + + n, err = s.writeBuffer.Write(p) + length := s.writeBuffer.Len() + s.writeLock.Unlock() + if err != nil { + return 0, err + } + if length > s.maxWriteSize { + select { + case <-s.requestProcessed: + case <-s.ctx.Done(): + return 0, s.ctx.Err() + } + } + return +} + +func (s *simpleAssemblerServerSession) Close() error { + s.finish() + return nil +} + +func (s *simpleAssemblerServerSession) OnRoundTrip(ctx context.Context, req request.Request, opts ...request.RoundTripperOption, +) (resp request.Response, err error) { + if req.Data != nil && len(req.Data) > 0 { + select { + case <-s.ctx.Done(): + return request.Response{}, s.ctx.Err() + case s.readChan <- req.Data: + } + } + + s.writeLock.Lock() + nextWrite := s.writeBuffer.Next(s.maxWriteSize) + data := make([]byte, len(nextWrite)) + copy(data, nextWrite) + s.writeLock.Unlock() + select { + case s.requestProcessed <- struct{}{}: + case <-s.ctx.Done(): + return request.Response{}, s.ctx.Err() + default: + } + return request.Response{Data: data}, nil +} + +func init() { + common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + serverConfig, ok := config.(*ServerConfig) + if !ok { + return nil, newError("not a SimpleServerConfig") + } + return newServer(serverConfig), nil + })) +} diff --git a/transport/internet/request/assembler/simple/simple.go b/transport/internet/request/assembler/simple/simple.go new file mode 100644 index 000000000..2a92860f6 --- /dev/null +++ b/transport/internet/request/assembler/simple/simple.go @@ -0,0 +1,3 @@ +package simple + +//go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen diff --git a/transport/internet/request/assembly/assembly.go b/transport/internet/request/assembly/assembly.go new file mode 100644 index 000000000..22d6df819 --- /dev/null +++ b/transport/internet/request/assembly/assembly.go @@ -0,0 +1,17 @@ +package assembly + +import ( + "context" + + "github.com/v2fly/v2ray-core/v5/common" +) + +//go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen + +const protocolName = "request" + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + return nil, newError("request is a transport protocol.") + })) +} diff --git a/transport/internet/request/assembly/config.pb.go b/transport/internet/request/assembly/config.pb.go new file mode 100644 index 000000000..036907e39 --- /dev/null +++ b/transport/internet/request/assembly/config.pb.go @@ -0,0 +1,174 @@ +package assembly + +import ( + _ "github.com/v2fly/v2ray-core/v5/common/protoext" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Assembler *anypb.Any `protobuf:"bytes,1,opt,name=assembler,proto3" json:"assembler,omitempty"` + Roundtripper *anypb.Any `protobuf:"bytes,2,opt,name=roundtripper,proto3" json:"roundtripper,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_request_assembly_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_request_assembly_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_transport_internet_request_assembly_config_proto_rawDescGZIP(), []int{0} +} + +func (x *Config) GetAssembler() *anypb.Any { + if x != nil { + return x.Assembler + } + return nil +} + +func (x *Config) GetRoundtripper() *anypb.Any { + if x != nil { + return x.Roundtripper + } + return nil +} + +var File_transport_internet_request_assembly_config_proto protoreflect.FileDescriptor + +var file_transport_internet_request_assembly_config_proto_rawDesc = []byte{ + 0x0a, 0x30, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2f, 0x61, 0x73, 0x73, + 0x65, 0x6d, 0x62, 0x6c, 0x79, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, + 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, + 0x6c, 0x79, 0x1a, 0x20, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x65, 0x78, 0x74, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x90, 0x01, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x32, 0x0a, 0x09, 0x61, 0x73, + 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x41, 0x6e, 0x79, 0x52, 0x09, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x72, 0x12, 0x38, + 0x0a, 0x0c, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x0c, 0x72, 0x6f, 0x75, 0x6e, + 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x3a, 0x18, 0x82, 0xb5, 0x18, 0x14, 0x0a, 0x09, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x42, 0xab, 0x01, 0x0a, 0x32, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, + 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x79, 0x50, 0x01, 0x5a, 0x42, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, + 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x74, 0x72, 0x61, 0x6e, + 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2f, 0x61, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x79, 0xaa, + 0x02, 0x2e, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x41, 0x73, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x79, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_transport_internet_request_assembly_config_proto_rawDescOnce sync.Once + file_transport_internet_request_assembly_config_proto_rawDescData = file_transport_internet_request_assembly_config_proto_rawDesc +) + +func file_transport_internet_request_assembly_config_proto_rawDescGZIP() []byte { + file_transport_internet_request_assembly_config_proto_rawDescOnce.Do(func() { + file_transport_internet_request_assembly_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_internet_request_assembly_config_proto_rawDescData) + }) + return file_transport_internet_request_assembly_config_proto_rawDescData +} + +var file_transport_internet_request_assembly_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_transport_internet_request_assembly_config_proto_goTypes = []interface{}{ + (*Config)(nil), // 0: v2ray.core.transport.internet.request.assembly.Config + (*anypb.Any)(nil), // 1: google.protobuf.Any +} +var file_transport_internet_request_assembly_config_proto_depIdxs = []int32{ + 1, // 0: v2ray.core.transport.internet.request.assembly.Config.assembler:type_name -> google.protobuf.Any + 1, // 1: v2ray.core.transport.internet.request.assembly.Config.roundtripper:type_name -> google.protobuf.Any + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_transport_internet_request_assembly_config_proto_init() } +func file_transport_internet_request_assembly_config_proto_init() { + if File_transport_internet_request_assembly_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_transport_internet_request_assembly_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_transport_internet_request_assembly_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_transport_internet_request_assembly_config_proto_goTypes, + DependencyIndexes: file_transport_internet_request_assembly_config_proto_depIdxs, + MessageInfos: file_transport_internet_request_assembly_config_proto_msgTypes, + }.Build() + File_transport_internet_request_assembly_config_proto = out.File + file_transport_internet_request_assembly_config_proto_rawDesc = nil + file_transport_internet_request_assembly_config_proto_goTypes = nil + file_transport_internet_request_assembly_config_proto_depIdxs = nil +} diff --git a/transport/internet/request/assembly/config.proto b/transport/internet/request/assembly/config.proto new file mode 100644 index 000000000..4004b23c3 --- /dev/null +++ b/transport/internet/request/assembly/config.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package v2ray.core.transport.internet.request.assembly; +option csharp_namespace = "V2Ray.Core.Transport.Internet.Request.Assembly"; +option go_package = "github.com/v2fly/v2ray-core/v5/transport/internet/request/assembly"; +option java_package = "com.v2ray.core.transport.internet.request.assembly"; +option java_multiple_files = true; + +import "common/protoext/extensions.proto"; +import "google/protobuf/any.proto"; + +message Config { + option (v2ray.core.common.protoext.message_opt).type = "transport"; + option (v2ray.core.common.protoext.message_opt).short_name = "request"; + + google.protobuf.Any assembler = 1; + google.protobuf.Any roundtripper = 2; +} \ No newline at end of file diff --git a/transport/internet/request/assembly/dialer.go b/transport/internet/request/assembly/dialer.go new file mode 100644 index 000000000..80d75bf1f --- /dev/null +++ b/transport/internet/request/assembly/dialer.go @@ -0,0 +1,125 @@ +package assembly + +import ( + "context" + gonet "net" + "time" + + "github.com/v2fly/v2ray-core/v5/transport/internet/transportcommon" + + "github.com/v2fly/v2ray-core/v5/common" + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/common/serial" + "github.com/v2fly/v2ray-core/v5/common/session" + "github.com/v2fly/v2ray-core/v5/transport/internet" + "github.com/v2fly/v2ray-core/v5/transport/internet/request" +) + +type client struct { + tripper request.RoundTripperClient + assembler request.SessionAssemblerClient + + streamSettings *internet.MemoryStreamConfig + dest net.Destination +} + +func (c client) Dial(ctx context.Context) (net.Conn, error) { + return transportcommon.DialWithSecuritySettings(ctx, c.dest, c.streamSettings) +} + +func (c client) AutoImplDialer() request.Dialer { + return c +} + +func (c client) Tripper() request.Tripper { + return c.tripper +} + +func (c client) dialRequestSession(ctx context.Context) (net.Conn, error) { + session, err := c.assembler.NewSession(ctx) + if err != nil { + return nil, newError("failed to create new session").Base(err) + } + return clientConnection{session}, nil +} + +type clientConnection struct { + request.Session +} + +func (c clientConnection) LocalAddr() gonet.Addr { + return &net.UnixAddr{Name: "unimplemented"} +} + +func (c clientConnection) RemoteAddr() gonet.Addr { + return &net.UnixAddr{Name: "unimplemented"} +} + +func (c clientConnection) SetDeadline(t time.Time) error { + // Unimplemented + return nil +} + +func (c clientConnection) SetReadDeadline(t time.Time) error { + // Unimplemented + return nil +} + +func (c clientConnection) SetWriteDeadline(t time.Time) error { + // Unimplemented + return nil +} + +func dialRequest(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) { + clientAssembly := &client{} + transportConfiguration := streamSettings.ProtocolSettings.(*Config) + + assemblerConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Assembler) + if err != nil { + return nil, newError("failed to get config instance of assembler").Base(err) + } + assembler, err := common.CreateObject(ctx, assemblerConfigInstance) + if err != nil { + return nil, newError("failed to create assembler").Base(err) + } + if typedAssembler, ok := assembler.(request.SessionAssemblerClient); !ok { + return nil, newError("failed to type assert assembler to SessionAssemblerClient") + } else { + clientAssembly.assembler = typedAssembler + } + + roundtripperConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Roundtripper) + if err != nil { + return nil, newError("failed to get config instance of roundtripper").Base(err) + } + roundtripper, err := common.CreateObject(ctx, roundtripperConfigInstance) + if err != nil { + return nil, newError("failed to create roundtripper").Base(err) + } + if typedRoundtripper, ok := roundtripper.(request.RoundTripperClient); !ok { + return nil, newError("failed to type assert roundtripper to RoundTripperClient") + } else { + clientAssembly.tripper = typedRoundtripper + } + + clientAssembly.streamSettings = streamSettings + clientAssembly.dest = dest + + clientAssembly.assembler.OnTransportClientAssemblyReady(clientAssembly) + clientAssembly.tripper.OnTransportClientAssemblyReady(clientAssembly) + return clientAssembly.dialRequestSession(ctx) +} + +func dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) { + newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx)) + + conn, err := dialRequest(ctx, dest, streamSettings) + if err != nil { + return nil, newError("failed to dial request to ", dest).Base(err) + } + return internet.Connection(conn), nil +} + +func init() { + common.Must(internet.RegisterTransportDialer(protocolName, dial)) +} diff --git a/transport/internet/request/assembly/errors.generated.go b/transport/internet/request/assembly/errors.generated.go new file mode 100644 index 000000000..48ffbedcc --- /dev/null +++ b/transport/internet/request/assembly/errors.generated.go @@ -0,0 +1,9 @@ +package assembly + +import "github.com/v2fly/v2ray-core/v5/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/request/assembly/hub.go b/transport/internet/request/assembly/hub.go new file mode 100644 index 000000000..7a6fc404f --- /dev/null +++ b/transport/internet/request/assembly/hub.go @@ -0,0 +1,135 @@ +package assembly + +import ( + "context" + gonet "net" + "time" + + "github.com/v2fly/v2ray-core/v5/transport/internet/transportcommon" + + "github.com/v2fly/v2ray-core/v5/common" + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/common/serial" + "github.com/v2fly/v2ray-core/v5/transport/internet" + "github.com/v2fly/v2ray-core/v5/transport/internet/request" +) + +type server struct { + tripper request.RoundTripperServer + assembler request.SessionAssemblerServer + addConn internet.ConnHandler + + streamSettings *internet.MemoryStreamConfig + addr net.Address + port net.Port +} + +func (s server) Listen(ctx context.Context) (net.Listener, error) { + return transportcommon.ListenWithSecuritySettings(ctx, s.addr, s.port, s.streamSettings) +} + +func (s server) AutoImplListener() request.Listener { + return s +} + +func (s server) Close() error { + if err := s.tripper.Close(); err != nil { + return newError("failed to close tripper").Base(err) + } + return nil +} + +func (s server) Addr() net.Addr { + // Unimplemented + return nil +} + +type serverConnection struct { + request.Session +} + +func (s serverConnection) LocalAddr() gonet.Addr { + return &net.UnixAddr{Name: "unimplemented"} +} + +func (s serverConnection) RemoteAddr() gonet.Addr { + return &net.UnixAddr{Name: "unimplemented"} +} + +func (s serverConnection) SetDeadline(t time.Time) error { + // Unimplemented + return nil +} + +func (s serverConnection) SetReadDeadline(t time.Time) error { + // Unimplemented + return nil +} + +func (s serverConnection) SetWriteDeadline(t time.Time) error { + // Unimplemented + return nil +} + +func (s server) OnNewSession(ctx context.Context, sess request.Session, opts ...request.SessionOption) error { + s.addConn(&serverConnection{sess}) + return nil +} + +func (s server) SessionReceiver() request.SessionReceiver { + return s +} + +func (s server) TripperReceiver() request.TripperReceiver { + return s.assembler +} + +func listenRequest(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) { + transportConfiguration := streamSettings.ProtocolSettings.(*Config) + serverAssembly := &server{addConn: addConn} + + assemblerConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Assembler) + if err != nil { + return nil, newError("failed to get config instance of assembler").Base(err) + } + assembler, err := common.CreateObject(ctx, assemblerConfigInstance) + if err != nil { + return nil, newError("failed to create assembler").Base(err) + } + if typedAssembler, ok := assembler.(request.SessionAssemblerServer); !ok { + return nil, newError("failed to type assert assembler to SessionAssemblerServer") + } else { + serverAssembly.assembler = typedAssembler + } + + roundtripperConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Roundtripper) + if err != nil { + return nil, newError("failed to get config instance of roundtripper").Base(err) + } + roundtripper, err := common.CreateObject(ctx, roundtripperConfigInstance) + if err != nil { + return nil, newError("failed to create roundtripper").Base(err) + } + if typedRoundtripper, ok := roundtripper.(request.RoundTripperServer); !ok { + return nil, newError("failed to type assert roundtripper to RoundTripperServer") + } else { + serverAssembly.tripper = typedRoundtripper + } + + serverAssembly.addr = address + serverAssembly.port = port + serverAssembly.streamSettings = streamSettings + + serverAssembly.assembler.OnTransportServerAssemblyReady(serverAssembly) + serverAssembly.tripper.OnTransportServerAssemblyReady(serverAssembly) + + if err := serverAssembly.tripper.Start(); err != nil { + return nil, newError("failed to start tripper").Base(err) + } + + return serverAssembly, nil +} + +func init() { + common.Must(internet.RegisterTransportListener(protocolName, listenRequest)) +} diff --git a/transport/internet/request/options.go b/transport/internet/request/options.go new file mode 100644 index 000000000..725b8fc21 --- /dev/null +++ b/transport/internet/request/options.go @@ -0,0 +1 @@ +package request diff --git a/transport/internet/request/request.go b/transport/internet/request/request.go new file mode 100644 index 000000000..b78418898 --- /dev/null +++ b/transport/internet/request/request.go @@ -0,0 +1,42 @@ +package request + +import ( + "context" + + "github.com/v2fly/v2ray-core/v5/common/net" +) + +type TransportClientAssembly interface { + Tripper() Tripper + AutoImplDialer() Dialer +} + +type TransportClientAssemblyReceiver interface { + OnTransportClientAssemblyReady(TransportClientAssembly) +} + +type TransportServerAssembly interface { + TripperReceiver() TripperReceiver + SessionReceiver() SessionReceiver + AutoImplListener() Listener +} + +type TransportServerAssemblyReceiver interface { + OnTransportServerAssemblyReady(TransportServerAssembly) +} + +type SessionCreator interface { + NewSession(ctx context.Context, opts ...SessionOption) (Session, error) +} + +type SessionReceiver interface { + OnNewSession(ctx context.Context, sess Session, opts ...SessionOption) error +} + +type Dialer interface { + Dial(ctx context.Context) (net.Conn, error) +} + +type Listener interface { + Listen(ctx context.Context) (net.Listener, error) +} diff --git a/transport/internet/request/roundtripper.go b/transport/internet/request/roundtripper.go new file mode 100644 index 000000000..abb8f2f89 --- /dev/null +++ b/transport/internet/request/roundtripper.go @@ -0,0 +1,38 @@ +package request + +import ( + "context" + + "github.com/v2fly/v2ray-core/v5/common" +) + +type RoundTripperClient interface { + Tripper + TransportClientAssemblyReceiver +} + +type RoundTripperServer interface { + common.Runnable + TransportServerAssemblyReceiver +} + +type Tripper interface { + RoundTrip(ctx context.Context, req Request, opts ...RoundTripperOption) (resp Response, err error) +} + +type TripperReceiver interface { + OnRoundTrip(ctx context.Context, req Request, opts ...RoundTripperOption) (resp Response, err error) +} + +type RoundTripperOption interface { + RoundTripperOption() +} + +type Request struct { + Data []byte + ConnectionTag []byte +} + +type Response struct { + Data []byte +} diff --git a/transport/internet/request/roundtripper/httprt/config.pb.go b/transport/internet/request/roundtripper/httprt/config.pb.go new file mode 100644 index 000000000..a93feb0a9 --- /dev/null +++ b/transport/internet/request/roundtripper/httprt/config.pb.go @@ -0,0 +1,320 @@ +package httprt + +import ( + _ "github.com/v2fly/v2ray-core/v5/common/protoext" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ClientConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Http *HTTPConfig `protobuf:"bytes,1,opt,name=http,proto3" json:"http,omitempty"` +} + +func (x *ClientConfig) Reset() { + *x = ClientConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ClientConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClientConfig) ProtoMessage() {} + +func (x *ClientConfig) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClientConfig.ProtoReflect.Descriptor instead. +func (*ClientConfig) Descriptor() ([]byte, []int) { + return file_transport_internet_request_roundtripper_httprt_config_proto_rawDescGZIP(), []int{0} +} + +func (x *ClientConfig) GetHttp() *HTTPConfig { + if x != nil { + return x.Http + } + return nil +} + +type ServerConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Http *HTTPConfig `protobuf:"bytes,1,opt,name=http,proto3" json:"http,omitempty"` + NoDecodingSessionTag bool `protobuf:"varint,2,opt,name=no_decoding_session_tag,json=noDecodingSessionTag,proto3" json:"no_decoding_session_tag,omitempty"` +} + +func (x *ServerConfig) Reset() { + *x = ServerConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerConfig) ProtoMessage() {} + +func (x *ServerConfig) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerConfig.ProtoReflect.Descriptor instead. +func (*ServerConfig) Descriptor() ([]byte, []int) { + return file_transport_internet_request_roundtripper_httprt_config_proto_rawDescGZIP(), []int{1} +} + +func (x *ServerConfig) GetHttp() *HTTPConfig { + if x != nil { + return x.Http + } + return nil +} + +func (x *ServerConfig) GetNoDecodingSessionTag() bool { + if x != nil { + return x.NoDecodingSessionTag + } + return false +} + +type HTTPConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + UrlPrefix string `protobuf:"bytes,2,opt,name=urlPrefix,proto3" json:"urlPrefix,omitempty"` +} + +func (x *HTTPConfig) Reset() { + *x = HTTPConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HTTPConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HTTPConfig) ProtoMessage() {} + +func (x *HTTPConfig) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HTTPConfig.ProtoReflect.Descriptor instead. +func (*HTTPConfig) Descriptor() ([]byte, []int) { + return file_transport_internet_request_roundtripper_httprt_config_proto_rawDescGZIP(), []int{2} +} + +func (x *HTTPConfig) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *HTTPConfig) GetUrlPrefix() string { + if x != nil { + return x.UrlPrefix + } + return "" +} + +var File_transport_internet_request_roundtripper_httprt_config_proto protoreflect.FileDescriptor + +var file_transport_internet_request_roundtripper_httprt_config_proto_rawDesc = []byte{ + 0x0a, 0x3b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x6f, 0x75, + 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2f, 0x68, 0x74, 0x74, 0x70, 0x72, 0x74, + 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x39, 0x76, + 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, + 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x2e, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, + 0x72, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x72, 0x74, 0x1a, 0x20, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x65, 0x78, 0x74, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, + 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x9e, 0x01, 0x0a, 0x0c, 0x43, + 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x59, 0x0a, 0x04, 0x68, + 0x74, 0x74, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x45, 0x2e, 0x76, 0x32, 0x72, 0x61, + 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, + 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x2e, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x68, + 0x74, 0x74, 0x70, 0x72, 0x74, 0x2e, 0x48, 0x54, 0x54, 0x50, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x52, 0x04, 0x68, 0x74, 0x74, 0x70, 0x3a, 0x33, 0x82, 0xb5, 0x18, 0x2f, 0x0a, 0x25, 0x74, 0x72, + 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x63, 0x6c, 0x69, + 0x65, 0x6e, 0x74, 0x12, 0x06, 0x68, 0x74, 0x74, 0x70, 0x72, 0x74, 0x22, 0xd5, 0x01, 0x0a, 0x0c, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x59, 0x0a, 0x04, + 0x68, 0x74, 0x74, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x45, 0x2e, 0x76, 0x32, 0x72, + 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, + 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2e, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, + 0x68, 0x74, 0x74, 0x70, 0x72, 0x74, 0x2e, 0x48, 0x54, 0x54, 0x50, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x52, 0x04, 0x68, 0x74, 0x74, 0x70, 0x12, 0x35, 0x0a, 0x17, 0x6e, 0x6f, 0x5f, 0x64, 0x65, + 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x74, + 0x61, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x14, 0x6e, 0x6f, 0x44, 0x65, 0x63, 0x6f, + 0x64, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x54, 0x61, 0x67, 0x3a, 0x33, + 0x82, 0xb5, 0x18, 0x2f, 0x0a, 0x25, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, + 0x70, 0x70, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x06, 0x68, 0x74, 0x74, + 0x70, 0x72, 0x74, 0x22, 0x3e, 0x0a, 0x0a, 0x48, 0x54, 0x54, 0x50, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x75, 0x72, 0x6c, 0x50, 0x72, 0x65, 0x66, + 0x69, 0x78, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x72, 0x6c, 0x50, 0x72, 0x65, + 0x66, 0x69, 0x78, 0x42, 0xcc, 0x01, 0x0a, 0x3d, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, + 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, + 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x2e, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x68, + 0x74, 0x74, 0x70, 0x72, 0x74, 0x50, 0x01, 0x5a, 0x4d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2d, + 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, + 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2f, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2f, + 0x68, 0x74, 0x74, 0x70, 0x72, 0x74, 0xaa, 0x02, 0x39, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e, 0x43, + 0x6f, 0x72, 0x65, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, + 0x6f, 0x75, 0x6e, 0x64, 0x74, 0x72, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x68, 0x74, 0x74, 0x70, + 0x72, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_transport_internet_request_roundtripper_httprt_config_proto_rawDescOnce sync.Once + file_transport_internet_request_roundtripper_httprt_config_proto_rawDescData = file_transport_internet_request_roundtripper_httprt_config_proto_rawDesc +) + +func file_transport_internet_request_roundtripper_httprt_config_proto_rawDescGZIP() []byte { + file_transport_internet_request_roundtripper_httprt_config_proto_rawDescOnce.Do(func() { + file_transport_internet_request_roundtripper_httprt_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_internet_request_roundtripper_httprt_config_proto_rawDescData) + }) + return file_transport_internet_request_roundtripper_httprt_config_proto_rawDescData +} + +var file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_transport_internet_request_roundtripper_httprt_config_proto_goTypes = []interface{}{ + (*ClientConfig)(nil), // 0: v2ray.core.transport.internet.request.roundtripper.httprt.ClientConfig + (*ServerConfig)(nil), // 1: v2ray.core.transport.internet.request.roundtripper.httprt.ServerConfig + (*HTTPConfig)(nil), // 2: v2ray.core.transport.internet.request.roundtripper.httprt.HTTPConfig +} +var file_transport_internet_request_roundtripper_httprt_config_proto_depIdxs = []int32{ + 2, // 0: v2ray.core.transport.internet.request.roundtripper.httprt.ClientConfig.http:type_name -> v2ray.core.transport.internet.request.roundtripper.httprt.HTTPConfig + 2, // 1: v2ray.core.transport.internet.request.roundtripper.httprt.ServerConfig.http:type_name -> v2ray.core.transport.internet.request.roundtripper.httprt.HTTPConfig + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_transport_internet_request_roundtripper_httprt_config_proto_init() } +func file_transport_internet_request_roundtripper_httprt_config_proto_init() { + if File_transport_internet_request_roundtripper_httprt_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ClientConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HTTPConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_transport_internet_request_roundtripper_httprt_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_transport_internet_request_roundtripper_httprt_config_proto_goTypes, + DependencyIndexes: file_transport_internet_request_roundtripper_httprt_config_proto_depIdxs, + MessageInfos: file_transport_internet_request_roundtripper_httprt_config_proto_msgTypes, + }.Build() + File_transport_internet_request_roundtripper_httprt_config_proto = out.File + file_transport_internet_request_roundtripper_httprt_config_proto_rawDesc = nil + file_transport_internet_request_roundtripper_httprt_config_proto_goTypes = nil + file_transport_internet_request_roundtripper_httprt_config_proto_depIdxs = nil +} diff --git a/transport/internet/request/roundtripper/httprt/config.proto b/transport/internet/request/roundtripper/httprt/config.proto new file mode 100644 index 000000000..ac2df25d6 --- /dev/null +++ b/transport/internet/request/roundtripper/httprt/config.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +package v2ray.core.transport.internet.request.roundtripper.httprt; +option csharp_namespace = "V2Ray.Core.Transport.Internet.Request.Roundtripper.httprt"; +option go_package = "github.com/v2fly/v2ray-core/v5/transport/internet/request/roundtripper/httprt"; +option java_package = "com.v2ray.core.transport.internet.request.roundtripper.httprt"; +option java_multiple_files = true; + +import "common/protoext/extensions.proto"; + +message ClientConfig { + option (v2ray.core.common.protoext.message_opt).type = "transport.request.roundtripper.client"; + option (v2ray.core.common.protoext.message_opt).short_name = "httprt"; + + HTTPConfig http = 1; +} + +message ServerConfig { + option (v2ray.core.common.protoext.message_opt).type = "transport.request.roundtripper.server"; + option (v2ray.core.common.protoext.message_opt).short_name = "httprt"; + HTTPConfig http = 1; + bool no_decoding_session_tag = 2; +} + +message HTTPConfig { + string path = 1; + string urlPrefix = 2; +} \ No newline at end of file diff --git a/transport/internet/request/roundtripper/httprt/errors.generated.go b/transport/internet/request/roundtripper/httprt/errors.generated.go new file mode 100644 index 000000000..69a8bfd75 --- /dev/null +++ b/transport/internet/request/roundtripper/httprt/errors.generated.go @@ -0,0 +1,9 @@ +package httprt + +import "github.com/v2fly/v2ray-core/v5/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/request/roundtripper/httprt/httprt.go b/transport/internet/request/roundtripper/httprt/httprt.go new file mode 100644 index 000000000..a4968befc --- /dev/null +++ b/transport/internet/request/roundtripper/httprt/httprt.go @@ -0,0 +1,146 @@ +package httprt + +//go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen + +import ( + "bytes" + "context" + "encoding/base64" + "io" + "net/http" + + "github.com/v2fly/v2ray-core/v5/common" + + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/transport/internet/request" +) + +func newHTTPRoundTripperClient(ctx context.Context, config *ClientConfig) request.RoundTripperClient { + return &httpTripperClient{config: config} +} + +type httpTripperClient struct { + httpRTT http.RoundTripper + config *ClientConfig + assembly request.TransportClientAssembly +} + +func (h *httpTripperClient) OnTransportClientAssemblyReady(assembly request.TransportClientAssembly) { + h.assembly = assembly +} + +func (h *httpTripperClient) RoundTrip(ctx context.Context, req request.Request, opts ...request.RoundTripperOption) (resp request.Response, err error) { + if h.httpRTT == nil { + h.httpRTT = &http.Transport{ + DialContext: func(dialCtx context.Context, network, addr string) (net.Conn, error) { + return h.assembly.AutoImplDialer().Dial(ctx) + }, + DialTLSContext: func(dialCtx context.Context, network, addr string) (net.Conn, error) { + return h.assembly.AutoImplDialer().Dial(ctx) + }, + } + } + + connectionTagStr := base64.RawURLEncoding.EncodeToString(req.ConnectionTag) + + httpRequest, err := http.NewRequest("POST", h.config.Http.UrlPrefix+h.config.Http.Path, bytes.NewReader(req.Data)) + if err != nil { + return + } + + httpRequest.Header.Set("X-Session-ID", connectionTagStr) + + httpResp, err := h.httpRTT.RoundTrip(httpRequest) + if err != nil { + return + } + defer httpResp.Body.Close() + result, err := io.ReadAll(httpResp.Body) + if err != nil { + return + } + return request.Response{Data: result}, err +} + +func newHTTPRoundTripperServer(ctx context.Context, config *ServerConfig) request.RoundTripperServer { + return &httpTripperServer{ctx: ctx, config: config} +} + +type httpTripperServer struct { + ctx context.Context + listener net.Listener + assembly request.TransportServerAssembly + + listenAddress net.Addr + config *ServerConfig +} + +func (h *httpTripperServer) OnTransportServerAssemblyReady(assembly request.TransportServerAssembly) { + h.assembly = assembly +} + +func (h *httpTripperServer) ServeHTTP(writer http.ResponseWriter, r *http.Request) { + h.onRequest(writer, r) +} + +func (h *httpTripperServer) onRequest(resp http.ResponseWriter, req *http.Request) { + tail := req.Header.Get("X-Session-ID") + data := []byte(tail) + if !h.config.NoDecodingSessionTag { + decodedData, err := base64.RawURLEncoding.DecodeString(tail) + if err != nil { + newError("unable to decode tag").Base(err).AtInfo().WriteToLog() + return + } + data = decodedData + } + body, err := io.ReadAll(req.Body) + req.Body.Close() + if err != nil { + newError("unable to read body").Base(err).AtInfo().WriteToLog() + } + recvResp, err := h.assembly.TripperReceiver().OnRoundTrip(h.ctx, request.Request{Data: body, ConnectionTag: data}) + if err != nil { + newError("unable to process roundtrip").Base(err).AtInfo().WriteToLog() + } + _, err = io.Copy(resp, bytes.NewReader(recvResp.Data)) + if err != nil { + newError("unable to send response").Base(err).AtInfo().WriteToLog() + } +} + +func (h *httpTripperServer) Start() error { + listener, err := h.assembly.AutoImplListener().Listen(h.ctx) + if err != nil { + return newError("unable to create a listener for http tripper server").Base(err) + } + h.listener = listener + go func() { + err := http.Serve(listener, h) + if err != nil { + newError("unable to serve listener for http tripper server").Base(err).WriteToLog() + } + }() + return nil +} + +func (h *httpTripperServer) Close() error { + return h.listener.Close() +} + +func init() { + common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + clientConfig, ok := config.(*ClientConfig) + if !ok { + return nil, newError("not a ClientConfig") + } + return newHTTPRoundTripperClient(ctx, clientConfig), nil + })) + common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + serverConfig, ok := config.(*ServerConfig) + if !ok { + return nil, newError("not a ServerConfig") + } + return newHTTPRoundTripperServer(ctx, serverConfig), nil + })) +} diff --git a/transport/internet/transportcommon/dialer.go b/transport/internet/transportcommon/dialer.go new file mode 100644 index 000000000..412c0c553 --- /dev/null +++ b/transport/internet/transportcommon/dialer.go @@ -0,0 +1,34 @@ +package transportcommon + +import ( + "context" + + "github.com/v2fly/v2ray-core/v5/transport/internet/security" + + "github.com/v2fly/v2ray-core/v5/common/environment" + "github.com/v2fly/v2ray-core/v5/common/environment/envctx" + + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/transport/internet" +) + +func DialWithSecuritySettings(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) { + transportEnvironment := envctx.EnvironmentFromContext(ctx).(environment.TransportEnvironment) + dialer := transportEnvironment.Dialer() + conn, err := dialer.Dial(ctx, nil, dest, streamSettings.SocketSettings) + if err != nil { + return nil, newError("failed to dial to ", dest).Base(err) + } + securityEngine, err := security.CreateSecurityEngineFromSettings(ctx, streamSettings) + if err != nil { + return nil, newError("unable to create security engine").Base(err) + } + + if securityEngine != nil { + conn, err = securityEngine.Client(conn, security.OptionWithDestination{Dest: dest}) + if err != nil { + return nil, newError("unable to create security protocol client from security engine").Base(err) + } + } + return internet.Connection(conn), nil +} diff --git a/transport/internet/transportcommon/errors.generated.go b/transport/internet/transportcommon/errors.generated.go new file mode 100644 index 000000000..b48153a2e --- /dev/null +++ b/transport/internet/transportcommon/errors.generated.go @@ -0,0 +1,9 @@ +package transportcommon + +import "github.com/v2fly/v2ray-core/v5/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/transportcommon/listener.go b/transport/internet/transportcommon/listener.go new file mode 100644 index 000000000..9c5c6e6a6 --- /dev/null +++ b/transport/internet/transportcommon/listener.go @@ -0,0 +1,62 @@ +package transportcommon + +import ( + "context" + + "github.com/v2fly/v2ray-core/v5/common/environment" + "github.com/v2fly/v2ray-core/v5/common/environment/envctx" + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/common/session" + "github.com/v2fly/v2ray-core/v5/transport/internet" +) + +type combinedListener struct { + net.Listener + locker *internet.FileLocker +} + +func (l *combinedListener) Close() error { + if l.locker != nil { + l.locker.Release() + } + return l.Listener.Close() +} + +func ListenWithSecuritySettings(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig) ( + net.Listener, error) { + var l combinedListener + + transportEnvironment := envctx.EnvironmentFromContext(ctx).(environment.TransportEnvironment) + transportListener := transportEnvironment.Listener() + + if port == net.Port(0) { // unix + listener, err := transportListener.Listen(ctx, &net.UnixAddr{ + Name: address.Domain(), + Net: "unix", + }, streamSettings.SocketSettings) + if err != nil { + return nil, newError("failed to listen unix domain socket on ", address).Base(err) + } + newError("listening unix domain socket on ", address).WriteToLog(session.ExportIDToError(ctx)) + locker := ctx.Value(address.Domain()) + if locker != nil { + l.locker = locker.(*internet.FileLocker) + } + l.Listener = listener + } else { // tcp + listener, err := transportListener.Listen(ctx, &net.TCPAddr{ + IP: address.IP(), + Port: int(port), + }, streamSettings.SocketSettings) + if err != nil { + return nil, newError("failed to listen TCP on ", address, ":", port).Base(err) + } + newError("listening TCP on ", address, ":", port).WriteToLog(session.ExportIDToError(ctx)) + l.Listener = listener + } + + if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.AcceptProxyProtocol { + newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx)) + } + return &l, nil +} diff --git a/transport/internet/transportcommon/tansportcommon.go b/transport/internet/transportcommon/tansportcommon.go new file mode 100644 index 000000000..13d0e143c --- /dev/null +++ b/transport/internet/transportcommon/tansportcommon.go @@ -0,0 +1,3 @@ +package transportcommon + +//go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen