From 4c79629c69e7d441fb292dd2675ab91a353b3692 Mon Sep 17 00:00:00 2001 From: Shelikhoo Date: Mon, 18 Apr 2022 20:02:44 +0100 Subject: [PATCH] Add Support for VLite UDP Server --- proxy/vlite/inbound/config.proto | 1 + proxy/vlite/inbound/connAdp.go | 48 ++++++++ proxy/vlite/inbound/inbound.go | 202 +++++++++++++++++++++++++++++++ 3 files changed, 251 insertions(+) create mode 100644 proxy/vlite/inbound/connAdp.go create mode 100644 proxy/vlite/inbound/inbound.go diff --git a/proxy/vlite/inbound/config.proto b/proxy/vlite/inbound/config.proto index 09584ad7a..ba73af00c 100644 --- a/proxy/vlite/inbound/config.proto +++ b/proxy/vlite/inbound/config.proto @@ -16,5 +16,6 @@ message UDPProtocolConfig { bool scramble_packet = 4; bool enable_fec = 5; bool enable_stabilization = 6; + bool enable_renegotiation = 7; uint32 handshake_masking_padding_size = 8; } \ No newline at end of file diff --git a/proxy/vlite/inbound/connAdp.go b/proxy/vlite/inbound/connAdp.go new file mode 100644 index 000000000..d88c7edcb --- /dev/null +++ b/proxy/vlite/inbound/connAdp.go @@ -0,0 +1,48 @@ +package inbound + +import ( + "github.com/v2fly/v2ray-core/v5/common/buf" + "github.com/v2fly/v2ray-core/v5/common/signal/done" + "io" + "net" +) + +func newUDPConnAdaptor(conn net.Conn, done *done.Instance) net.Conn { + return &udpConnAdp{ + Conn: conn, + reader: buf.NewPacketReader(conn), + cachedMultiBuffer: nil, + finished: done, + } +} + +type udpConnAdp struct { + net.Conn + reader buf.Reader + + cachedMultiBuffer buf.MultiBuffer + + finished *done.Instance +} + +func (u *udpConnAdp) Read(p []byte) (n int, err error) { + if u.cachedMultiBuffer.IsEmpty() { + u.cachedMultiBuffer, err = u.reader.ReadMultiBuffer() + if err != nil { + return 0, newError("unable to read from connection").Base(err) + } + } + var buffer *buf.Buffer + u.cachedMultiBuffer, buffer = buf.SplitFirst(u.cachedMultiBuffer) + defer buffer.Release() + n = copy(p, buffer.Bytes()) + if n != int(buffer.Len()) { + return 0, io.ErrShortBuffer + } + return n, nil +} + +func (u *udpConnAdp) Close() error { + u.finished.Close() + return u.Conn.Close() +} diff --git a/proxy/vlite/inbound/inbound.go b/proxy/vlite/inbound/inbound.go new file mode 100644 index 000000000..6801d21a3 --- /dev/null +++ b/proxy/vlite/inbound/inbound.go @@ -0,0 +1,202 @@ +package inbound + +import ( + "context" + "github.com/mustafaturan/bus" + "github.com/v2fly/v2ray-core/v5/common" + "github.com/v2fly/v2ray-core/v5/common/environment" + "github.com/v2fly/v2ray-core/v5/common/environment/envctx" + "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/common/session" + "github.com/v2fly/v2ray-core/v5/common/signal/done" + "github.com/v2fly/v2ray-core/v5/features/routing" + "github.com/v2fly/v2ray-core/v5/transport/internet" + "github.com/xiaokangwang/VLite/interfaces" + "github.com/xiaokangwang/VLite/interfaces/ibus" + "github.com/xiaokangwang/VLite/transport" + udpsctpserver "github.com/xiaokangwang/VLite/transport/packetsctp/sctprelay" + "github.com/xiaokangwang/VLite/transport/packetuni/puniServer" + "github.com/xiaokangwang/VLite/transport/udp/udpServer" + "github.com/xiaokangwang/VLite/transport/udp/udpuni/udpunis" + "github.com/xiaokangwang/VLite/transport/uni/uniserver" + "github.com/xiaokangwang/VLite/workers/server" + "io" + gonet "net" + "strconv" + "sync" +) + +//go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen + +func NewUDPInboundHandler(ctx context.Context, config *UDPProtocolConfig) (*Handler, error) { + proxyEnvironment := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment) + statusInstance, err := createStatusFromConfig(config) + if err != nil { + return nil, newError("unable to initialize vlite").Base(err) + } + proxyEnvironment.TransientStorage().Put(ctx, "status", statusInstance) + return &Handler{ctx: ctx}, nil +} + +type Handler struct { + ctx context.Context +} + +func (h *Handler) Network() []net.Network { + list := []net.Network{net.Network_UDP} + return list +} + +type status struct { + config *UDPProtocolConfig + + password []byte + msgbus *bus.Bus + + ctx context.Context + + transport transport.UnderlayTransportListener + + access sync.Mutex +} + +func (s *status) RelayStream(conn io.ReadWriteCloser, ctx context.Context) { +} + +func (s *status) Connection(conn gonet.Conn, connctx context.Context) context.Context { + S_S2CTraffic := make(chan server.UDPServerTxToClientTraffic, 8) + S_S2CDataTraffic := make(chan server.UDPServerTxToClientDataTraffic, 8) + S_C2STraffic := make(chan server.UDPServerRxFromClientTraffic, 8) + + S_S2CTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) + S_S2CDataTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) + S_C2STraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) + + go func(ctx context.Context) { + for { + select { + case data := <-S_S2CTraffic: + S_S2CTraffic2 <- interfaces.TrafficWithChannelTag(data) + case <-ctx.Done(): + return + } + } + }(connctx) + + go func(ctx context.Context) { + for { + select { + case data := <-S_S2CDataTraffic: + S_S2CDataTraffic2 <- interfaces.TrafficWithChannelTag(data) + case <-ctx.Done(): + return + } + } + + }(connctx) + + go func(ctx context.Context) { + for { + select { + case data := <-S_C2STraffic2: + S_C2STraffic <- server.UDPServerRxFromClientTraffic(data) + case <-ctx.Done(): + return + } + } + + }(connctx) + + if s.config.EnableStabilization && s.config.EnableRenegotiation { + relay := udpsctpserver.NewPacketRelayServer(conn, S_S2CTraffic2, S_S2CDataTraffic2, S_C2STraffic2, s, s.password, connctx) + udpserver := server.UDPServer(connctx, S_S2CTraffic, S_S2CDataTraffic, S_C2STraffic, relay) + _ = udpserver + } else { + relay := puniServer.NewPacketUniServer(S_S2CTraffic2, S_S2CDataTraffic2, S_C2STraffic2, s, s.password, connctx) + relay.OnAutoCarrier(conn, connctx) + udpserver := server.UDPServer(connctx, S_S2CTraffic, S_S2CDataTraffic, S_C2STraffic, relay) + _ = udpserver + } + return nil +} + +func createStatusFromConfig(config *UDPProtocolConfig) (*status, error) { + s := &status{ctx: context.Background(), config: config} + + s.password = []byte(config.Password) + + s.msgbus = ibus.NewMessageBus() + s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsMessageBus, s.msgbus) + + if config.ScramblePacket { + s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPShouldMask, true) + } + + if s.config.EnableFec { + s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPFECEnabled, true) + } + + s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPMask, string(s.password)) + + if config.HandshakeMaskingPaddingSize != 0 { + ctxv := &interfaces.ExtraOptionsUsePacketArmorValue{PacketArmorPaddingTo: int(config.HandshakeMaskingPaddingSize), UsePacketArmor: true} + s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUsePacketArmor, ctxv) + } + + return s, nil +} + +func enableInterface(s *status) error { + s.transport = s + if s.config.EnableStabilization { + s.transport = uniserver.NewUnifiedConnectionTransportHub(s, s.ctx) + } + s.transport = udpunis.NewUdpUniServer(string(s.password), s.ctx, s.transport) + return nil +} + +func (h *Handler) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error { + proxyEnvironment := envctx.EnvironmentFromContext(h.ctx).(environment.ProxyEnvironment) + statusInstanceIfce, err := proxyEnvironment.TransientStorage().Get(ctx, "status") + if err != nil { + return newError("uninitialized handler").Base(err) + } + statusInstance := statusInstanceIfce.(*status) + err = h.ensureStarted(statusInstance) + if err != nil { + return newError("unable to initialize").Base(err) + } + finish := done.New() + conn = newUDPConnAdaptor(conn, finish) + var initialData [1600]byte + c, err := conn.Read(initialData[:]) + if err != nil { + return newError("unable to read initial data").Base(err) + } + connID := session.IDFromContext(ctx) + vconn, connctx := udpServer.PrepareIncomingUDPConnection(conn, statusInstance.ctx, initialData[:c], strconv.FormatInt(int64(connID), 10)) + connctx = statusInstance.transport.Connection(vconn, connctx) + if connctx == nil { + return newError("invalid connection discarded") + } + <-finish.Wait() + return nil +} + +func (h *Handler) ensureStarted(s *status) error { + s.access.Lock() + defer s.access.Unlock() + if s.transport == nil { + err := enableInterface(s) + if err != nil { + return err + } + } + return nil +} + +func init() { + common.Must(common.RegisterConfig((*UDPProtocolConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + return NewUDPInboundHandler(ctx, config.(*UDPProtocolConfig)) + })) +}