feat: refine handle logic

This commit is contained in:
AkinoKaede 2023-05-30 13:28:22 +08:00 committed by Shelikhoo
parent 5f6031b1e8
commit 0a090a073b
No known key found for this signature in database
GPG Key ID: C4D5E79D22B25316
5 changed files with 20 additions and 67 deletions

View File

@ -1,18 +1 @@
package tun
import (
tun_net "github.com/v2fly/v2ray-core/v5/app/tun/net"
)
var (
tcpQueue = make(chan tun_net.TCPConn)
udpQueue = make(chan tun_net.UDPConn)
)
func handleTCP(conn tun_net.TCPConn) {
tcpQueue <- conn
}
func handleUDP(conn tun_net.UDPConn) {
udpQueue <- conn
}

View File

@ -42,7 +42,7 @@ type TCPHandler struct {
stack *stack.Stack
}
func HandleTCP(handle func(tun_net.TCPConn)) StackOption {
func SetTCPHandler(ctx context.Context, dispatcher routing.Dispatcher, policyManager policy.Manager, config *Config) StackOption {
return func(s *stack.Stack) error {
tcpForwarder := tcp.NewForwarder(s, rcvWnd, maxInFlight, func(r *tcp.ForwarderRequest) {
wg := new(waiter.Queue)
@ -60,27 +60,22 @@ func HandleTCP(handle func(tun_net.TCPConn)) StackOption {
id: r.ID(),
}
handle(conn)
handler := &TCPHandler{
ctx: ctx,
dispatcher: dispatcher,
policyManager: policyManager,
config: config,
}
handler.Handle(conn)
})
s.SetTransportProtocolHandler(tcp.ProtocolNumber, tcpForwarder.HandlePacket)
return nil
}
}
func (h *TCPHandler) HandleQueue(ch chan tun_net.TCPConn) {
for {
select {
case conn := <-ch:
if err := h.Handle(conn); err != nil {
newError(err).AtError().WriteToLog(session.ExportIDToError(h.ctx))
}
case <-h.ctx.Done():
return
}
}
}
func (h *TCPHandler) Handle(conn tun_net.TCPConn) error {
defer conn.Close()
id := conn.ID()

View File

@ -36,7 +36,7 @@ func (c *udpConn) ID() *stack.TransportEndpointID {
return &c.id
}
func HandleUDP(handle func(tun_net.UDPConn)) StackOption {
func SetUDPHandler(ctx context.Context, dispatcher routing.Dispatcher, policyManager policy.Manager, config *Config) StackOption {
return func(s *stack.Stack) error {
udpForwarder := gvisor_udp.NewForwarder(s, func(r *gvisor_udp.ForwarderRequest) {
wg := new(waiter.Queue)
@ -51,26 +51,19 @@ func HandleUDP(handle func(tun_net.UDPConn)) StackOption {
id: r.ID(),
}
handle(conn)
handler := &UDPHandler{
ctx: ctx,
dispatcher: dispatcher,
policyManager: policyManager,
config: config,
}
handler.Handle(conn)
})
s.SetTransportProtocolHandler(gvisor_udp.ProtocolNumber, udpForwarder.HandlePacket)
return nil
}
}
func (h *UDPHandler) HandleQueue(ch chan tun_net.UDPConn) {
for {
select {
case <-h.ctx.Done():
return
case conn := <-ch:
if err := h.Handle(conn); err != nil {
newError(err).AtError().WriteToLog(session.ExportIDToError(h.ctx))
}
}
}
}
func (h *UDPHandler) Handle(conn tun_net.UDPConn) error {
defer conn.Close()
id := conn.ID()

View File

@ -29,8 +29,8 @@ func (t *TUN) CreateStack(linkedEndpoint stack.LinkEndpoint) (*stack.Stack, erro
nicID := tcpip.NICID(s.UniqueID())
opts := []StackOption{
HandleTCP(handleTCP),
HandleUDP(handleUDP),
SetTCPHandler(t.ctx, t.dispatcher, t.policyManager, t.config),
SetUDPHandler(t.ctx, t.dispatcher, t.policyManager, t.config),
CreateNIC(nicID, linkedEndpoint),
AddProtocolAddress(nicID, t.config.Ips),

View File

@ -46,24 +46,6 @@ func (t *TUN) Start() error {
}
t.stack = stack
tcpHandler := &TCPHandler{
ctx: t.ctx,
dispatcher: t.dispatcher,
policyManager: t.policyManager,
config: t.config,
stack: stack,
}
go tcpHandler.Handle(<-tcpQueue)
udpHander := &UDPHandler{
ctx: t.ctx,
dispatcher: t.dispatcher,
policyManager: t.policyManager,
config: t.config,
stack: stack,
}
go udpHander.Handle(<-udpQueue)
return nil
}