1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-13 21:07:02 -05:00

loopback outbound, allow you to redirect connection to the dispatcher again (#770)

* Added Loop back proxy

* Added json processing for lo proxy

* Fix bug for lo proxy

* Fix bug for lo proxy

* rename the outbound name

* Loopback: update naming and fix lint issues

* Chore: change lo to loopback

Co-authored-by: kslr <kslrwang@gmail.com>
Co-authored-by: loyalsoldier <10487845+Loyalsoldier@users.noreply.github.com>
This commit is contained in:
Xiaokang Wang 2021-03-13 09:25:56 +00:00 committed by GitHub
parent 0ae9ba5247
commit 0e5e5164ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 315 additions and 0 deletions

14
infra/conf/loopback.go Normal file
View File

@ -0,0 +1,14 @@
package conf
import (
"github.com/golang/protobuf/proto"
"github.com/v2fly/v2ray-core/v4/proxy/loopback"
)
type LoopbackConfig struct {
InboundTag string `json:"inboundTag"`
}
func (l LoopbackConfig) Build() (proto.Message, error) {
return &loopback.Config{InboundTag: l.InboundTag}, nil
}

View File

@ -36,6 +36,7 @@ var (
"trojan": func() interface{} { return new(TrojanClientConfig) }, "trojan": func() interface{} { return new(TrojanClientConfig) },
"mtproto": func() interface{} { return new(MTProtoClientConfig) }, "mtproto": func() interface{} { return new(MTProtoClientConfig) },
"dns": func() interface{} { return new(DNSOutboundConfig) }, "dns": func() interface{} { return new(DNSOutboundConfig) },
"loopback": func() interface{} { return new(LoopbackConfig) },
}, "protocol", "settings") }, "protocol", "settings")
ctllog = log.New(os.Stderr, "v2ctl> ", 0) ctllog = log.New(os.Stderr, "v2ctl> ", 0)

3
proxy/loopback/config.go Normal file
View File

@ -0,0 +1,3 @@
package loopback
//go:generate go run github.com/v2fly/v2ray-core/v4/common/errors/errorgen

155
proxy/loopback/config.pb.go Normal file
View File

@ -0,0 +1,155 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0
// protoc v3.15.5
// source: proxy/loopback/config.proto
package loopback
import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type Config struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
InboundTag string `protobuf:"bytes,1,opt,name=inbound_tag,json=inboundTag,proto3" json:"inbound_tag,omitempty"`
}
func (x *Config) Reset() {
*x = Config{}
if protoimpl.UnsafeEnabled {
mi := &file_proxy_loopback_config_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Config) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Config) ProtoMessage() {}
func (x *Config) ProtoReflect() protoreflect.Message {
mi := &file_proxy_loopback_config_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Config.ProtoReflect.Descriptor instead.
func (*Config) Descriptor() ([]byte, []int) {
return file_proxy_loopback_config_proto_rawDescGZIP(), []int{0}
}
func (x *Config) GetInboundTag() string {
if x != nil {
return x.InboundTag
}
return ""
}
var File_proxy_loopback_config_proto protoreflect.FileDescriptor
var file_proxy_loopback_config_proto_rawDesc = []byte{
0x0a, 0x1b, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x6c, 0x6f, 0x6f, 0x70, 0x62, 0x61, 0x63, 0x6b,
0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x19, 0x76,
0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e,
0x6c, 0x6f, 0x6f, 0x70, 0x62, 0x61, 0x63, 0x6b, 0x22, 0x29, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x12, 0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x74, 0x61,
0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64,
0x54, 0x61, 0x67, 0x42, 0x6c, 0x0a, 0x1d, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79,
0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x6c, 0x6f, 0x6f, 0x70,
0x62, 0x61, 0x63, 0x6b, 0x50, 0x01, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2d, 0x63,
0x6f, 0x72, 0x65, 0x2f, 0x76, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x6c, 0x6f, 0x6f,
0x70, 0x62, 0x61, 0x63, 0x6b, 0xaa, 0x02, 0x19, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e, 0x43, 0x6f,
0x72, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x4c, 0x6f, 0x6f, 0x70, 0x62, 0x61, 0x63,
0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_proxy_loopback_config_proto_rawDescOnce sync.Once
file_proxy_loopback_config_proto_rawDescData = file_proxy_loopback_config_proto_rawDesc
)
func file_proxy_loopback_config_proto_rawDescGZIP() []byte {
file_proxy_loopback_config_proto_rawDescOnce.Do(func() {
file_proxy_loopback_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_proxy_loopback_config_proto_rawDescData)
})
return file_proxy_loopback_config_proto_rawDescData
}
var file_proxy_loopback_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_proxy_loopback_config_proto_goTypes = []interface{}{
(*Config)(nil), // 0: v2ray.core.proxy.loopback.Config
}
var file_proxy_loopback_config_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_proxy_loopback_config_proto_init() }
func file_proxy_loopback_config_proto_init() {
if File_proxy_loopback_config_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_proxy_loopback_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Config); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proxy_loopback_config_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_proxy_loopback_config_proto_goTypes,
DependencyIndexes: file_proxy_loopback_config_proto_depIdxs,
MessageInfos: file_proxy_loopback_config_proto_msgTypes,
}.Build()
File_proxy_loopback_config_proto = out.File
file_proxy_loopback_config_proto_rawDesc = nil
file_proxy_loopback_config_proto_goTypes = nil
file_proxy_loopback_config_proto_depIdxs = nil
}

View File

@ -0,0 +1,11 @@
syntax = "proto3";
package v2ray.core.proxy.loopback;
option csharp_namespace = "V2Ray.Core.Proxy.Loopback";
option go_package = "github.com/v2fly/v2ray-core/v4/proxy/loopback";
option java_package = "com.v2ray.core.proxy.loopback";
option java_multiple_files = true;
message Config {
string inbound_tag = 1;
}

View File

@ -0,0 +1,9 @@
package loopback
import "github.com/v2fly/v2ray-core/v4/common/errors"
type errPathObjHolder struct{}
func newError(values ...interface{}) *errors.Error {
return errors.New(values...).WithPathObj(errPathObjHolder{})
}

122
proxy/loopback/lookback.go Normal file
View File

@ -0,0 +1,122 @@
// +build !confonly
package loopback
import (
"context"
core "github.com/v2fly/v2ray-core/v4"
"github.com/v2fly/v2ray-core/v4/common"
"github.com/v2fly/v2ray-core/v4/common/buf"
"github.com/v2fly/v2ray-core/v4/common/net"
"github.com/v2fly/v2ray-core/v4/common/retry"
"github.com/v2fly/v2ray-core/v4/common/session"
"github.com/v2fly/v2ray-core/v4/common/task"
"github.com/v2fly/v2ray-core/v4/features/routing"
"github.com/v2fly/v2ray-core/v4/transport"
"github.com/v2fly/v2ray-core/v4/transport/internet"
)
type Loopback struct {
config *Config
dispatcherInstance routing.Dispatcher
}
func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet.Dialer) error {
outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.")
}
destination := outbound.Target
newError("opening connection to ", destination).WriteToLog(session.ExportIDToError(ctx))
input := link.Reader
output := link.Writer
var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error {
dialDest := destination
content := new(session.Content)
content.SkipDNSResolve = true
ctx = session.ContextWithContent(ctx, content)
inbound := session.InboundFromContext(ctx)
inbound.Tag = l.config.InboundTag
ctx = session.ContextWithInbound(ctx, inbound)
rawConn, err := l.dispatcherInstance.Dispatch(ctx, dialDest)
if err != nil {
return err
}
var readerOpt net.ConnectionOption
if dialDest.Network == net.Network_TCP {
readerOpt = net.ConnectionOutputMulti(rawConn.Reader)
} else {
readerOpt = net.ConnectionOutputMultiUDP(rawConn.Reader)
}
conn = net.NewConnection(net.ConnectionInputMulti(rawConn.Writer), readerOpt)
return nil
})
if err != nil {
return newError("failed to open connection to ", destination).Base(err)
}
defer conn.Close()
requestDone := func() error {
var writer buf.Writer
if destination.Network == net.Network_TCP {
writer = buf.NewWriter(conn)
} else {
writer = &buf.SequentialWriter{Writer: conn}
}
if err := buf.Copy(input, writer); err != nil {
return newError("failed to process request").Base(err)
}
return nil
}
responseDone := func() error {
var reader buf.Reader
if destination.Network == net.Network_TCP {
reader = buf.NewReader(conn)
} else {
reader = buf.NewPacketReader(conn)
}
if err := buf.Copy(reader, output); err != nil {
return newError("failed to process response").Base(err)
}
return nil
}
if err := task.Run(ctx, requestDone, task.OnSuccess(responseDone, task.Close(output))); err != nil {
return newError("connection ends").Base(err)
}
return nil
}
func (l *Loopback) init(config *Config, dispatcherInstance routing.Dispatcher) error {
l.dispatcherInstance = dispatcherInstance
l.config = config
return nil
}
func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
l := new(Loopback)
err := core.RequireFeatures(ctx, func(dispatcherInstance routing.Dispatcher) error {
return l.init(config.(*Config), dispatcherInstance)
})
return l, err
}))
}