1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-11-10 06:16:53 -05:00
v2fly/app/tun/handler_tcp.go

167 lines
4.9 KiB
Go
Raw Normal View History

2023-05-28 00:18:58 -04:00
package tun
import (
"context"
2023-05-31 01:13:34 -04:00
"time"
2023-05-28 00:18:58 -04:00
2023-05-28 10:22:59 -04:00
tun_net "github.com/v2fly/v2ray-core/v5/app/tun/net"
2023-05-28 00:18:58 -04:00
"github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/log"
"github.com/v2fly/v2ray-core/v5/common/net"
2023-05-28 05:00:50 -04:00
"github.com/v2fly/v2ray-core/v5/common/session"
2023-05-28 00:18:58 -04:00
"github.com/v2fly/v2ray-core/v5/common/signal"
"github.com/v2fly/v2ray-core/v5/common/task"
"github.com/v2fly/v2ray-core/v5/features/policy"
"github.com/v2fly/v2ray-core/v5/features/routing"
2023-05-31 01:13:34 -04:00
internet "github.com/v2fly/v2ray-core/v5/transport/internet"
"gvisor.dev/gvisor/pkg/tcpip"
2023-05-28 00:18:58 -04:00
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
2023-05-31 01:13:34 -04:00
"gvisor.dev/gvisor/pkg/tcpip/header"
2023-05-28 00:18:58 -04:00
"gvisor.dev/gvisor/pkg/tcpip/stack"
"gvisor.dev/gvisor/pkg/tcpip/transport/tcp"
"gvisor.dev/gvisor/pkg/waiter"
)
2023-05-28 02:05:05 -04:00
const (
rcvWnd = 0 // default settings
maxInFlight = 2 << 10
)
2023-05-28 09:08:36 -04:00
type tcpConn struct {
*gonet.TCPConn
id stack.TransportEndpointID
}
func (c *tcpConn) ID() *stack.TransportEndpointID {
return &c.id
}
2023-05-28 00:18:58 -04:00
type TCPHandler struct {
ctx context.Context
dispatcher routing.Dispatcher
policyManager policy.Manager
config *Config
}
2023-05-30 01:28:22 -04:00
func SetTCPHandler(ctx context.Context, dispatcher routing.Dispatcher, policyManager policy.Manager, config *Config) StackOption {
2023-05-28 02:32:48 -04:00
return func(s *stack.Stack) error {
tcpForwarder := tcp.NewForwarder(s, rcvWnd, maxInFlight, func(r *tcp.ForwarderRequest) {
wg := new(waiter.Queue)
linkedEndpoint, err := r.CreateEndpoint(wg)
if err != nil {
r.Complete(true)
return
}
defer r.Complete(false)
2023-05-28 00:18:58 -04:00
2023-05-31 01:13:34 -04:00
if err := applySocketOptions(s, linkedEndpoint, config.SocketSettings); err != nil {
newError("failed to apply socket options: ", err).WriteToLog(session.ExportIDToError(ctx))
}
2023-05-28 00:18:58 -04:00
2023-05-28 10:43:08 -04:00
conn := &tcpConn{
2023-05-28 09:08:36 -04:00
TCPConn: gonet.NewTCPConn(wg, linkedEndpoint),
id: r.ID(),
2023-05-28 02:32:48 -04:00
}
2023-05-30 01:28:22 -04:00
handler := &TCPHandler{
ctx: ctx,
dispatcher: dispatcher,
policyManager: policyManager,
config: config,
}
go handler.Handle(conn)
2023-05-28 02:32:48 -04:00
})
2023-05-30 01:28:22 -04:00
2023-05-28 02:32:48 -04:00
s.SetTransportProtocolHandler(tcp.ProtocolNumber, tcpForwarder.HandlePacket)
return nil
}
2023-05-28 00:18:58 -04:00
}
2023-05-28 10:22:59 -04:00
func (h *TCPHandler) Handle(conn tun_net.TCPConn) error {
defer conn.Close()
id := conn.ID()
2023-05-28 05:00:50 -04:00
ctx := session.ContextWithInbound(h.ctx, &session.Inbound{Tag: h.config.Tag})
2023-05-28 00:18:58 -04:00
sessionPolicy := h.policyManager.ForLevel(h.config.UserLevel)
2023-05-28 10:22:59 -04:00
dest := net.TCPDestination(tun_net.AddressFromTCPIPAddr(id.LocalAddress), net.Port(id.LocalPort))
src := net.TCPDestination(tun_net.AddressFromTCPIPAddr(id.RemoteAddress), net.Port(id.RemotePort))
ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
2023-05-28 10:32:01 -04:00
From: src,
2023-05-28 00:18:58 -04:00
To: dest,
Status: log.AccessAccepted,
Reason: "",
})
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
link, err := h.dispatcher.Dispatch(ctx, dest)
if err != nil {
return newError("failed to dispatch").Base(err)
}
responseDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
if err := buf.Copy(link.Reader, buf.NewWriter(conn), buf.UpdateActivity(timer)); err != nil {
return newError("failed to transport all TCP response").Base(err)
}
return nil
}
requestDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
if err := buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer)); err != nil {
return newError("failed to transport all TCP request").Base(err)
}
return nil
}
requestDoneAndCloseWriter := task.OnSuccess(requestDone, task.Close(link.Writer))
if err := task.Run(h.ctx, requestDoneAndCloseWriter, responseDone); err != nil {
common.Interrupt(link.Reader)
common.Interrupt(link.Writer)
return newError("connection ends").Base(err)
}
return nil
}
2023-05-31 01:13:34 -04:00
func applySocketOptions(s *stack.Stack, endpoint tcpip.Endpoint, config *internet.SocketConfig) tcpip.Error {
if config.TcpKeepAliveInterval > 0 {
interval := tcpip.KeepaliveIntervalOption(time.Duration(config.TcpKeepAliveInterval) * time.Second)
if err := endpoint.SetSockOpt(&interval); err != nil {
return err
}
}
if config.TcpKeepAliveIdle > 0 {
idle := tcpip.KeepaliveIdleOption(time.Duration(config.TcpKeepAliveIdle) * time.Second)
if err := endpoint.SetSockOpt(&idle); err != nil {
return err
}
}
if config.TcpKeepAliveInterval > 0 || config.TcpKeepAliveIdle > 0 {
endpoint.SocketOptions().SetKeepAlive(true)
}
{
var sendBufferSizeRangeOption tcpip.TCPSendBufferSizeRangeOption
if err := s.TransportProtocolOption(header.TCPProtocolNumber, &sendBufferSizeRangeOption); err == nil {
endpoint.SocketOptions().SetReceiveBufferSize(int64(sendBufferSizeRangeOption.Default), false)
}
var receiveBufferSizeRangeOption tcpip.TCPReceiveBufferSizeRangeOption
if err := s.TransportProtocolOption(header.TCPProtocolNumber, &receiveBufferSizeRangeOption); err == nil {
endpoint.SocketOptions().SetSendBufferSize(int64(receiveBufferSizeRangeOption.Default), false)
}
}
return nil
}