diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index e0d183682..b66534fd1 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -8,6 +8,7 @@ import ( "github.com/v2fly/v2ray-core/v5/common" "github.com/v2fly/v2ray-core/v5/common/buf" "github.com/v2fly/v2ray-core/v5/common/net" + "github.com/v2fly/v2ray-core/v5/common/net/packetaddr" "github.com/v2fly/v2ray-core/v5/common/protocol" "github.com/v2fly/v2ray-core/v5/common/retry" "github.com/v2fly/v2ray-core/v5/common/session" @@ -16,9 +17,10 @@ import ( "github.com/v2fly/v2ray-core/v5/features/policy" "github.com/v2fly/v2ray-core/v5/transport" "github.com/v2fly/v2ray-core/v5/transport/internet" + "github.com/v2fly/v2ray-core/v5/transport/internet/udp" ) -// Client is an inbound handler for Shadowsocks protocol +// Client is a inbound handler for Shadowsocks protocol type Client struct { serverPicker protocol.ServerPicker policyManager policy.Manager @@ -99,6 +101,28 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) + if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil { + requestDone := func() error { + protocolWriter := &UDPWriter{ + Writer: conn, + Request: request, + } + return udp.CopyPacketConn(protocolWriter, packetConn, udp.UpdateActivity(timer)) + } + responseDone := func() error { + protocolReader := &UDPReader{ + Reader: conn, + User: user, + } + return udp.CopyPacketConn(packetConn, protocolReader, udp.UpdateActivity(timer)) + } + responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer)) + if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil { + return newError("connection ends").Base(err) + } + return nil + } + if request.Command == protocol.RequestCommandTCP { requestDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) diff --git a/proxy/shadowsocks/protocol.go b/proxy/shadowsocks/protocol.go index ae19bf11e..55549d907 100644 --- a/proxy/shadowsocks/protocol.go +++ b/proxy/shadowsocks/protocol.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "hash/crc32" "io" + gonet "net" "github.com/v2fly/v2ray-core/v5/common" "github.com/v2fly/v2ray-core/v5/common/buf" @@ -253,6 +254,23 @@ func (v *UDPReader) ReadMultiBuffer() (buf.MultiBuffer, error) { return buf.MultiBuffer{payload}, nil } +func (v *UDPReader) ReadFrom(p []byte) (n int, addr gonet.Addr, err error) { + buffer := buf.New() + _, err = buffer.ReadFrom(v.Reader) + if err != nil { + buffer.Release() + return 0, nil, err + } + vaddr, payload, err := DecodeUDPPacket(v.User, buffer) + if err != nil { + buffer.Release() + return 0, nil, err + } + n = copy(p, payload.Bytes()) + payload.Release() + return n, &gonet.UDPAddr{IP: vaddr.Address.IP(), Port: int(vaddr.Port)}, nil +} + type UDPWriter struct { Writer io.Writer Request *protocol.RequestHeader @@ -268,3 +286,18 @@ func (w *UDPWriter) Write(payload []byte) (int, error) { packet.Release() return len(payload), err } + +func (w *UDPWriter) WriteTo(payload []byte, addr gonet.Addr) (n int, err error) { + request := *w.Request + udpAddr := addr.(*gonet.UDPAddr) + request.Command = protocol.RequestCommandUDP + request.Address = net.IPAddress(udpAddr.IP) + request.Port = net.Port(udpAddr.Port) + packet, err := EncodeUDPPacket(&request, payload) + if err != nil { + return 0, err + } + _, err = w.Writer.Write(packet.Bytes()) + packet.Release() + return len(payload), err +} diff --git a/transport/internet/connection.go b/transport/internet/connection.go index 6d77721d9..3484c1fd8 100644 --- a/transport/internet/connection.go +++ b/transport/internet/connection.go @@ -3,13 +3,33 @@ package internet import ( "net" - "github.com/v2fly/v2ray-core/v5/features/stats" + "github.com/v2fly/v2ray-core/v4/common" + "github.com/v2fly/v2ray-core/v4/features/stats" ) type Connection interface { net.Conn } +type AbstractPacketConnReader interface { + ReadFrom(p []byte) (n int, addr net.Addr, err error) +} + +type AbstractPacketConnWriter interface { + WriteTo(p []byte, addr net.Addr) (n int, err error) +} + +type AbstractPacketConn interface { + AbstractPacketConnReader + AbstractPacketConnWriter + common.Closable +} + +type PacketConn interface { + AbstractPacketConn + net.PacketConn +} + type StatCouterConnection struct { Connection ReadCounter stats.Counter diff --git a/transport/internet/udp/copy.go b/transport/internet/udp/copy.go new file mode 100644 index 000000000..f5f83e38d --- /dev/null +++ b/transport/internet/udp/copy.go @@ -0,0 +1,47 @@ +package udp + +import ( + gonet "net" + + "github.com/v2fly/v2ray-core/v4/common/signal" + "github.com/v2fly/v2ray-core/v4/transport/internet" +) + +type dataHandler func(content []byte, address gonet.Addr) + +type copyHandler struct { + onData []dataHandler +} + +type CopyOption func(*copyHandler) + +func CopyPacketConn(dst internet.AbstractPacketConnWriter, src internet.AbstractPacketConnReader, options ...CopyOption) error { + var handler copyHandler + for _, option := range options { + option(&handler) + } + var buffer [2048]byte + for { + n, addr, err := src.ReadFrom(buffer[:]) + if err != nil { + return err + } + + for _, handler := range handler.onData { + handler(buffer[:n], addr) + } + + n, err = dst.WriteTo(buffer[:n], addr) + if err != nil { + return err + } + } +} + +func UpdateActivity(timer signal.ActivityUpdater) CopyOption { + return func(handler *copyHandler) { + handler.onData = append(handler.onData, func(content []byte, address gonet.Addr) { + timer.Update() + }) + } +}