1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-07-26 11:44:22 -04:00

add implementation for transport environment and network env

This commit is contained in:
Shelikhoo 2022-09-06 20:22:10 +01:00
parent acbb5e6e08
commit 69ab87239a
No known key found for this signature in database
GPG Key ID: C4D5E79D22B25316
7 changed files with 157 additions and 4 deletions

View File

@ -16,6 +16,7 @@ import (
// Manager is to manage all inbound handlers. // Manager is to manage all inbound handlers.
type Manager struct { type Manager struct {
ctx context.Context
access sync.RWMutex access sync.RWMutex
untaggedHandler []inbound.Handler untaggedHandler []inbound.Handler
taggedHandlers map[string]inbound.Handler taggedHandlers map[string]inbound.Handler
@ -25,6 +26,7 @@ type Manager struct {
// New returns a new Manager for inbound handlers. // New returns a new Manager for inbound handlers.
func New(ctx context.Context, config *proxyman.InboundConfig) (*Manager, error) { func New(ctx context.Context, config *proxyman.InboundConfig) (*Manager, error) {
m := &Manager{ m := &Manager{
ctx: ctx,
taggedHandlers: make(map[string]inbound.Handler), taggedHandlers: make(map[string]inbound.Handler),
} }
return m, nil return m, nil

View File

@ -9,6 +9,8 @@ import (
"github.com/v2fly/v2ray-core/v5/app/proxyman" "github.com/v2fly/v2ray-core/v5/app/proxyman"
"github.com/v2fly/v2ray-core/v5/common" "github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/buf" "github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/environment"
"github.com/v2fly/v2ray-core/v5/common/environment/envctx"
"github.com/v2fly/v2ray-core/v5/common/net" "github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/serial" "github.com/v2fly/v2ray-core/v5/common/serial"
"github.com/v2fly/v2ray-core/v5/common/session" "github.com/v2fly/v2ray-core/v5/common/session"
@ -112,6 +114,12 @@ func (w *tcpWorker) Proxy() proxy.Inbound {
func (w *tcpWorker) Start() error { func (w *tcpWorker) Start() error {
ctx := context.Background() ctx := context.Background()
proxyEnvironment := envctx.EnvironmentFromContext(w.ctx).(environment.ProxyEnvironment)
transportEnvironment, err := proxyEnvironment.NarrowScopeToTransport("transport")
if err != nil {
return newError("unable to narrow environment to transport").Base(err)
}
ctx = envctx.ContextWithEnvironment(ctx, transportEnvironment)
hub, err := internet.ListenTCP(ctx, w.address, w.port, w.stream, func(conn internet.Connection) { hub, err := internet.ListenTCP(ctx, w.address, w.port, w.stream, func(conn internet.Connection) {
go w.callback(conn) go w.callback(conn)
}) })
@ -371,6 +379,12 @@ func (w *udpWorker) clean() error {
func (w *udpWorker) Start() error { func (w *udpWorker) Start() error {
w.activeConn = make(map[connID]*udpConn, 16) w.activeConn = make(map[connID]*udpConn, 16)
ctx := context.Background() ctx := context.Background()
proxyEnvironment := envctx.EnvironmentFromContext(w.ctx).(environment.ProxyEnvironment)
transportEnvironment, err := proxyEnvironment.NarrowScopeToTransport("transport")
if err != nil {
return newError("unable to narrow environment to transport").Base(err)
}
ctx = envctx.ContextWithEnvironment(ctx, transportEnvironment)
h, err := udp.ListenUDP(ctx, w.address, w.port, w.stream, udp.HubCapacity(256)) h, err := udp.ListenUDP(ctx, w.address, w.port, w.stream, udp.HubCapacity(256))
if err != nil { if err != nil {
return err return err
@ -480,6 +494,12 @@ func (w *dsWorker) Port() net.Port {
func (w *dsWorker) Start() error { func (w *dsWorker) Start() error {
ctx := context.Background() ctx := context.Background()
proxyEnvironment := envctx.EnvironmentFromContext(w.ctx).(environment.ProxyEnvironment)
transportEnvironment, err := proxyEnvironment.NarrowScopeToTransport("transport")
if err != nil {
return newError("unable to narrow environment to transport").Base(err)
}
ctx = envctx.ContextWithEnvironment(ctx, transportEnvironment)
hub, err := internet.ListenUnix(ctx, w.address, w.stream, func(conn internet.Connection) { hub, err := internet.ListenUnix(ctx, w.address, w.stream, func(conn internet.Connection) {
go w.callback(conn) go w.callback(conn)
}) })

View File

@ -7,6 +7,8 @@ import (
"github.com/v2fly/v2ray-core/v5/app/proxyman" "github.com/v2fly/v2ray-core/v5/app/proxyman"
"github.com/v2fly/v2ray-core/v5/common" "github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/dice" "github.com/v2fly/v2ray-core/v5/common/dice"
"github.com/v2fly/v2ray-core/v5/common/environment"
"github.com/v2fly/v2ray-core/v5/common/environment/envctx"
"github.com/v2fly/v2ray-core/v5/common/mux" "github.com/v2fly/v2ray-core/v5/common/mux"
"github.com/v2fly/v2ray-core/v5/common/net" "github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/net/packetaddr" "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
@ -50,6 +52,7 @@ func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter)
// Handler is an implements of outbound.Handler. // Handler is an implements of outbound.Handler.
type Handler struct { type Handler struct {
ctx context.Context
tag string tag string
senderSettings *proxyman.SenderConfig senderSettings *proxyman.SenderConfig
streamSettings *internet.MemoryStreamConfig streamSettings *internet.MemoryStreamConfig
@ -66,6 +69,7 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou
v := core.MustFromContext(ctx) v := core.MustFromContext(ctx)
uplinkCounter, downlinkCounter := getStatCounter(v, config.Tag) uplinkCounter, downlinkCounter := getStatCounter(v, config.Tag)
h := &Handler{ h := &Handler{
ctx: ctx,
tag: config.Tag, tag: config.Tag,
outboundManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager), outboundManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
uplinkCounter: uplinkCounter, uplinkCounter: uplinkCounter,
@ -251,6 +255,12 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
return h.getStatCouterConnection(conn), nil return h.getStatCouterConnection(conn), nil
} }
proxyEnvironment := envctx.EnvironmentFromContext(h.ctx).(environment.ProxyEnvironment)
transportEnvironment, err := proxyEnvironment.NarrowScopeToTransport("transport")
if err != nil {
return nil, newError("unable to narrow environment to transport").Base(err)
}
ctx = envctx.ContextWithEnvironment(ctx, transportEnvironment)
conn, err := internet.Dial(ctx, dest, h.streamSettings) conn, err := internet.Dial(ctx, dest, h.streamSettings)
return h.getStatCouterConnection(conn), err return h.getStatCouterConnection(conn), err
} }

View File

@ -5,6 +5,12 @@ import (
"testing" "testing"
_ "unsafe" _ "unsafe"
"github.com/v2fly/v2ray-core/v5/common/environment/systemnetworkimpl"
"github.com/v2fly/v2ray-core/v5/common/environment"
"github.com/v2fly/v2ray-core/v5/common/environment/envctx"
"github.com/v2fly/v2ray-core/v5/common/environment/transientstorageimpl"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
core "github.com/v2fly/v2ray-core/v5" core "github.com/v2fly/v2ray-core/v5"
@ -43,6 +49,10 @@ func TestOutboundWithoutStatCounter(t *testing.T) {
v, _ := core.New(config) v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager))) v.AddFeature((outbound.Manager)(new(Manager)))
ctx := toContext(context.Background(), v) ctx := toContext(context.Background(), v)
defaultNetworkImpl := systemnetworkimpl.NewSystemNetworkDefault()
rootEnv := environment.NewRootEnvImpl(ctx, transientstorageimpl.NewScopedTransientStorageImpl(), defaultNetworkImpl.Dialer(), defaultNetworkImpl.Listener())
proxyEnvironment := rootEnv.ProxyEnvironment("o")
ctx = envctx.ContextWithEnvironment(ctx, proxyEnvironment)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{ h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag", Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}), ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
@ -72,6 +82,10 @@ func TestOutboundWithStatCounter(t *testing.T) {
v, _ := core.New(config) v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager))) v.AddFeature((outbound.Manager)(new(Manager)))
ctx := toContext(context.Background(), v) ctx := toContext(context.Background(), v)
defaultNetworkImpl := systemnetworkimpl.NewSystemNetworkDefault()
rootEnv := environment.NewRootEnvImpl(ctx, transientstorageimpl.NewScopedTransientStorageImpl(), defaultNetworkImpl.Dialer(), defaultNetworkImpl.Listener())
proxyEnvironment := rootEnv.ProxyEnvironment("o")
ctx = envctx.ContextWithEnvironment(ctx, proxyEnvironment)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{ h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag", Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}), ProxySettings: serial.ToTypedMessage(&freedom.Config{}),

View File

@ -9,12 +9,21 @@ import (
"github.com/v2fly/v2ray-core/v5/transport/internet/tagged" "github.com/v2fly/v2ray-core/v5/transport/internet/tagged"
) )
func NewRootEnvImpl(ctx context.Context, transientStorage storage.ScopedTransientStorage) RootEnvironment { func NewRootEnvImpl(ctx context.Context, transientStorage storage.ScopedTransientStorage,
return &rootEnvImpl{transientStorage: transientStorage, ctx: ctx} systemDialer internet.SystemDialer, systemListener internet.SystemListener,
) RootEnvironment {
return &rootEnvImpl{
transientStorage: transientStorage,
systemListener: systemListener,
systemDialer: systemDialer,
ctx: ctx,
}
} }
type rootEnvImpl struct { type rootEnvImpl struct {
transientStorage storage.ScopedTransientStorage transientStorage storage.ScopedTransientStorage
systemDialer internet.SystemDialer
systemListener internet.SystemListener
ctx context.Context ctx context.Context
} }
@ -30,6 +39,8 @@ func (r *rootEnvImpl) AppEnvironment(tag string) AppEnvironment {
} }
return &appEnvImpl{ return &appEnvImpl{
transientStorage: transientStorage, transientStorage: transientStorage,
systemListener: r.systemListener,
systemDialer: r.systemDialer,
ctx: r.ctx, ctx: r.ctx,
} }
} }
@ -41,12 +52,16 @@ func (r *rootEnvImpl) ProxyEnvironment(tag string) ProxyEnvironment {
} }
return &proxyEnvImpl{ return &proxyEnvImpl{
transientStorage: transientStorage, transientStorage: transientStorage,
systemListener: r.systemListener,
systemDialer: r.systemDialer,
ctx: r.ctx, ctx: r.ctx,
} }
} }
type appEnvImpl struct { type appEnvImpl struct {
transientStorage storage.ScopedTransientStorage transientStorage storage.ScopedTransientStorage
systemDialer internet.SystemDialer
systemListener internet.SystemListener
ctx context.Context ctx context.Context
} }
@ -98,6 +113,8 @@ func (a *appEnvImpl) NarrowScope(key string) (AppEnvironment, error) {
} }
return &appEnvImpl{ return &appEnvImpl{
transientStorage: transientStorage, transientStorage: transientStorage,
systemDialer: a.systemDialer,
systemListener: a.systemListener,
ctx: a.ctx, ctx: a.ctx,
}, nil }, nil
} }
@ -108,6 +125,8 @@ func (a *appEnvImpl) doNotImpl() {
type proxyEnvImpl struct { type proxyEnvImpl struct {
transientStorage storage.ScopedTransientStorage transientStorage storage.ScopedTransientStorage
systemDialer internet.SystemDialer
systemListener internet.SystemListener
ctx context.Context ctx context.Context
} }
@ -156,6 +175,8 @@ func (p *proxyEnvImpl) doNotImpl() {
type transportEnvImpl struct { type transportEnvImpl struct {
transientStorage storage.ScopedTransientStorage transientStorage storage.ScopedTransientStorage
systemDialer internet.SystemDialer
systemListener internet.SystemListener
ctx context.Context ctx context.Context
} }

View File

@ -0,0 +1,84 @@
package systemnetworkimpl
import (
"context"
"github.com/v2fly/v2ray-core/v5/common/environment"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/transport/internet"
)
func NewSystemNetworkImpl(listener internet.SystemListener, dialer internet.SystemDialer) environment.SystemNetworkCapabilitySet {
return &systemNetworkImpl{dialer: dialer, listener: listener}
}
type systemDefaultDialer struct{}
func (s systemDefaultDialer) Listen(ctx context.Context, addr net.Addr, sockopt *internet.SocketConfig) (net.Listener, error) {
return internet.ListenSystem(ctx, addr, sockopt)
}
func (s systemDefaultDialer) ListenPacket(ctx context.Context, addr net.Addr, sockopt *internet.SocketConfig) (net.PacketConn, error) {
return internet.ListenSystemPacket(ctx, addr, sockopt)
}
func (s systemDefaultDialer) Dial(ctx context.Context, source net.Address, destination net.Destination, sockopt *internet.SocketConfig) (net.Conn, error) {
return internet.DialSystem(ctx, destination, sockopt)
}
func NewSystemNetworkDefault() environment.SystemNetworkCapabilitySet {
systemDefault := systemDefaultDialer{}
return &systemNetworkImpl{dialer: systemDefault, listener: systemDefault}
}
type systemNetworkImpl struct {
listener internet.SystemListener
dialer internet.SystemDialer
}
func (s systemNetworkImpl) Dialer() internet.SystemDialer {
return s.dialer
}
func (s systemNetworkImpl) Listener() internet.SystemListener {
return s.listener
}
func NewSystemListenerWithDefaultOpt(listener internet.SystemListener, opt *internet.SocketConfig) internet.SystemListener {
return systemListenerWithDefaultOpt{SystemListener: listener, opt: opt}
}
type systemListenerWithDefaultOpt struct {
internet.SystemListener
opt *internet.SocketConfig
}
func (s systemListenerWithDefaultOpt) Listen(ctx context.Context, addr net.Addr, sockopt *internet.SocketConfig) (net.Listener, error) {
if sockopt == nil {
return s.Listen(ctx, addr, s.opt)
}
return s.Listen(ctx, addr, sockopt)
}
func (s systemListenerWithDefaultOpt) ListenPacket(ctx context.Context, addr net.Addr, sockopt *internet.SocketConfig) (net.PacketConn, error) {
if sockopt == nil {
return s.ListenPacket(ctx, addr, s.opt)
}
return s.ListenPacket(ctx, addr, sockopt)
}
func NewSystemDialerWithDefaultOpt(listener internet.SystemDialer, opt *internet.SocketConfig) internet.SystemDialer {
return systemDialerWithDefaultOpt{SystemDialer: listener, opt: opt}
}
type systemDialerWithDefaultOpt struct {
internet.SystemDialer
opt *internet.SocketConfig
}
func (s systemDialerWithDefaultOpt) Dial(ctx context.Context, source net.Address, destination net.Destination, sockopt *internet.SocketConfig) (net.Conn, error) {
if sockopt == nil {
return s.Dial(ctx, source, destination, s.opt)
}
return s.Dial(ctx, source, destination, sockopt)
}

View File

@ -3,10 +3,11 @@ package core
import ( import (
"context" "context"
"reflect" "reflect"
"sync" sync "sync"
"github.com/v2fly/v2ray-core/v5/common" "github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/environment" "github.com/v2fly/v2ray-core/v5/common/environment"
"github.com/v2fly/v2ray-core/v5/common/environment/systemnetworkimpl"
"github.com/v2fly/v2ray-core/v5/common/environment/transientstorageimpl" "github.com/v2fly/v2ray-core/v5/common/environment/transientstorageimpl"
"github.com/v2fly/v2ray-core/v5/common/serial" "github.com/v2fly/v2ray-core/v5/common/serial"
"github.com/v2fly/v2ray-core/v5/features" "github.com/v2fly/v2ray-core/v5/features"
@ -191,7 +192,8 @@ func initInstanceWithConfig(config *Config, server *Instance) (bool, error) {
return true, err return true, err
} }
server.env = environment.NewRootEnvImpl(server.ctx, transientstorageimpl.NewScopedTransientStorageImpl()) defaultNetworkImpl := systemnetworkimpl.NewSystemNetworkDefault()
server.env = environment.NewRootEnvImpl(server.ctx, transientstorageimpl.NewScopedTransientStorageImpl(), defaultNetworkImpl.Dialer(), defaultNetworkImpl.Listener())
for _, appSettings := range config.App { for _, appSettings := range config.App {
settings, err := serial.GetInstanceOf(appSettings) settings, err := serial.GetInstanceOf(appSettings)