1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-09-26 13:56:12 -04:00

add shadowsocks2022 udp client support

This commit is contained in:
Shelikhoo 2023-11-18 23:09:26 +00:00 committed by Xiaokang Wang (Shelikhoo)
parent e575a525bb
commit d8e32f17bd
8 changed files with 623 additions and 24 deletions

View File

@ -4,13 +4,19 @@ import (
"context"
"github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/environment"
"github.com/v2fly/v2ray-core/v5/common/environment/envctx"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
"github.com/v2fly/v2ray-core/v5/common/retry"
"github.com/v2fly/v2ray-core/v5/common/session"
"github.com/v2fly/v2ray-core/v5/common/signal"
"github.com/v2fly/v2ray-core/v5/common/task"
"github.com/v2fly/v2ray-core/v5/transport"
"github.com/v2fly/v2ray-core/v5/transport/internet"
"github.com/v2fly/v2ray-core/v5/transport/internet/udp"
gonet "net"
"sync"
"time"
)
@ -19,6 +25,33 @@ type Client struct {
ctx context.Context
}
const UDPConnectionState = "UDPConnectionState"
type ClientUDPConnState struct {
session *ClientUDPSession
initOnce *sync.Once
}
func (c *ClientUDPConnState) GetOrCreateSession(create func() (*ClientUDPSession, error)) (*ClientUDPSession, error) {
var err error
c.initOnce.Do(func() {
sessionState, err := create()
if err != nil {
err = newError("failed to create UDP session").Base(err)
return
}
c.session = sessionState
})
if err != nil {
return nil, newError("failed to initialize UDP State").Base(err)
}
return c.session, nil
}
func NewClientUDPConnState() (*ClientUDPConnState, error) {
return &ClientUDPConnState{initOnce: &sync.Once{}}, nil
}
func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() {
@ -27,25 +60,6 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
destination := outbound.Target
network := destination.Network
var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error {
dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port))
dest.Network = network
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return err
}
conn = rawConn
return nil
})
if err != nil {
return newError("failed to find an available destination").AtWarning().Base(err)
}
newError("tunneling request to ", destination, " via ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx))
defer conn.Close()
var keyDerivation = newBLAKE3KeyDerivation()
var method Method
switch c.config.Method {
@ -62,7 +76,44 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute)
if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil {
udpSession, err := c.getUDPSession(c.ctx, network, dialer, method, keyDerivation)
if err != nil {
return newError("failed to get UDP udpSession").Base(err)
}
requestDone := func() error {
return udp.CopyPacketConn(udpSession, packetConn, udp.UpdateActivity(timer))
}
responseDone := func() error {
return udp.CopyPacketConn(packetConn, udpSession, udp.UpdateActivity(timer))
}
responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer))
if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil {
return newError("connection ends").Base(err)
}
return nil
}
if network == net.Network_TCP {
var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error {
dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port))
dest.Network = network
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return err
}
conn = rawConn
return nil
})
if err != nil {
return newError("failed to find an available destination").AtWarning().Base(err)
}
newError("tunneling request to ", destination, " via ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx))
defer conn.Close()
request := &TCPRequest{
keyDerivation: keyDerivation,
method: method,
@ -107,11 +158,79 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
}
return nil
} else {
return newError("not implemented")
udpSession, err := c.getUDPSession(c.ctx, network, dialer, method, keyDerivation)
if err != nil {
return newError("failed to get UDP udpSession").Base(err)
}
monoDestUDPConn := udp.NewMonoDestUDPConn(udpSession, &gonet.UDPAddr{IP: destination.Address.IP(), Port: int(destination.Port)})
requestDone := func() error {
return buf.Copy(link.Reader, monoDestUDPConn, buf.UpdateActivity(timer))
}
responseDone := func() error {
return buf.Copy(monoDestUDPConn, link.Writer, buf.UpdateActivity(timer))
}
responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer))
if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil {
return newError("connection ends").Base(err)
}
return nil
}
}
func (c *Client) getUDPSession(ctx context.Context, network net.Network, dialer internet.Dialer, method Method, keyDerivation *BLAKE3KeyDerivation) (internet.AbstractPacketConn, error) {
storage := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment).TransientStorage()
clientUDPStateIfce, err := storage.Get(ctx, UDPConnectionState)
if err != nil {
return nil, newError("failed to get UDP connection state").Base(err)
}
clientUDPState, ok := clientUDPStateIfce.(*ClientUDPConnState)
if !ok {
return nil, newError("failed to cast UDP connection state")
}
sessionState, err := clientUDPState.GetOrCreateSession(func() (*ClientUDPSession, error) {
var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error {
dest := net.TCPDestination(c.config.Address.AsAddress(), net.Port(c.config.Port))
dest.Network = network
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return err
}
conn = rawConn
return nil
})
if err != nil {
return nil, newError("failed to find an available destination").AtWarning().Base(err)
}
newError("creating udp session to ", network, ":", c.config.Address).WriteToLog(session.ExportIDToError(ctx))
packetProcessor, err := method.GetUDPClientProcessor(c.config.Ipsk, c.config.Psk, keyDerivation)
if err != nil {
return nil, newError("failed to create UDP client packet processor").Base(err)
}
return NewClientUDPSession(ctx, conn, packetProcessor), nil
})
if err != nil {
return nil, newError("failed to create UDP session").Base(err)
}
sessionConn, err := sessionState.NewSessionConn()
if err != nil {
return nil, newError("failed to create UDP session connection").Base(err)
}
return sessionConn, nil
}
func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
storage := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment).TransientStorage()
udpState, err := NewClientUDPConnState()
if err != nil {
return nil, newError("failed to create UDP connection state").Base(err)
}
storage.Put(ctx, UDPConnectionState, udpState)
return &Client{
config: config,
ctx: ctx,

View File

@ -0,0 +1,162 @@
package shadowsocks2022
import (
"context"
"crypto/rand"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/transport/internet"
"io"
gonet "net"
"sync"
"time"
)
func NewClientUDPSession(ctx context.Context, conn io.ReadWriteCloser, packetProcessor UDPClientPacketProcessor) *ClientUDPSession {
session := &ClientUDPSession{
locker: &sync.Mutex{},
conn: conn,
packetProcessor: packetProcessor,
sessionMap: make(map[string]*ClientUDPSessionConn),
}
session.ctx, session.finish = context.WithCancel(ctx)
go session.KeepReading()
return session
}
type ClientUDPSession struct {
locker *sync.Mutex
conn io.ReadWriteCloser
packetProcessor UDPClientPacketProcessor
sessionMap map[string]*ClientUDPSessionConn
ctx context.Context
finish func()
}
func (c *ClientUDPSession) Close() error {
c.finish()
return c.conn.Close()
}
func (c *ClientUDPSession) WriteUDPRequest(request *UDPRequest) error {
buffer := buf.New()
defer buffer.Release()
err := c.packetProcessor.EncodeUDPRequest(request, buffer)
if request.Payload != nil {
request.Payload.Release()
}
if err != nil {
return newError("unable to encode udp request").Base(err)
}
_, err = c.conn.Write(buffer.Bytes())
if err != nil {
return newError("unable to write to conn").Base(err)
}
return nil
}
func (c *ClientUDPSession) KeepReading() {
for c.ctx.Err() == nil {
udpResp := &UDPResponse{}
buffer := make([]byte, 1600)
n, err := c.conn.Read(buffer)
if err != nil {
newError("unable to read from conn").Base(err).WriteToLog()
return
}
if n != 0 {
err := c.packetProcessor.DecodeUDPResp(buffer[:n], udpResp)
if err != nil {
newError("unable to decode udp response").Base(err).WriteToLog()
continue
}
c.locker.Lock()
session, ok := c.sessionMap[string(udpResp.ClientSessionID[:])]
if ok {
select {
case session.readChan <- udpResp:
default:
}
} else {
newError("misbehaving server: unknown client session ID").Base(err).WriteToLog()
}
c.locker.Unlock()
}
}
}
func (c *ClientUDPSession) NewSessionConn() (internet.AbstractPacketConn, error) {
sessionID := make([]byte, 8)
_, err := rand.Read(sessionID)
if err != nil {
return nil, newError("unable to generate session id").Base(err)
}
connctx, connfinish := context.WithCancel(c.ctx)
sessionConn := &ClientUDPSessionConn{
sessionID: string(sessionID),
readChan: make(chan *UDPResponse, 16),
parent: c,
ctx: connctx,
finish: connfinish,
nextWritePacketID: 0,
}
c.locker.Lock()
c.sessionMap[sessionConn.sessionID] = sessionConn
c.locker.Unlock()
return sessionConn, nil
}
type ClientUDPSessionConn struct {
sessionID string
readChan chan *UDPResponse
parent *ClientUDPSession
nextWritePacketID uint64
ctx context.Context
finish func()
}
func (c *ClientUDPSessionConn) Close() error {
delete(c.parent.sessionMap, c.sessionID)
c.finish()
return nil
}
func (c *ClientUDPSessionConn) WriteTo(p []byte, addr gonet.Addr) (n int, err error) {
thisPacketID := c.nextWritePacketID
c.nextWritePacketID += 1
req := &UDPRequest{
SessionID: [8]byte{},
PacketID: thisPacketID,
TimeStamp: uint64(time.Now().Unix()),
Address: net.IPAddress(addr.(*gonet.UDPAddr).IP),
Port: addr.(*net.UDPAddr).Port,
Payload: nil,
}
copy(req.SessionID[:], c.sessionID)
req.Payload = buf.New()
req.Payload.Write(p)
err = c.parent.WriteUDPRequest(req)
if err != nil {
return 0, newError("unable to write to parent session").Base(err)
}
return len(p), nil
}
func (c *ClientUDPSessionConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case <-c.ctx.Done():
return 0, nil, io.EOF
case resp := <-c.readChan:
n = copy(p, resp.Payload.Bytes())
resp.Payload.Release()
addr = &net.UDPAddr{IP: resp.Address.IP(), Port: int(resp.Port)}
}
return
}

View File

@ -1,6 +1,7 @@
package shadowsocks2022
import (
"crypto/subtle"
"github.com/lunixbochs/struc"
"github.com/v2fly/v2ray-core/v5/common/buf"
"io"
@ -78,18 +79,33 @@ func newAESEIHGeneratorContainer(size int, effectivePsk []byte, ipsk [][]byte) *
}
func (a *aesEIHGenerator) GenerateEIH(derivation KeyDerivation, method Method, salt []byte) (ExtensibleIdentityHeaders, error) {
return a.generateEIHWithMask(derivation, method, salt, nil)
}
func (a *aesEIHGenerator) GenerateEIHUDP(derivation KeyDerivation, method Method, mask []byte) (ExtensibleIdentityHeaders, error) {
return a.generateEIHWithMask(derivation, method, nil, mask)
}
func (a *aesEIHGenerator) generateEIHWithMask(derivation KeyDerivation, method Method, salt, mask []byte) (ExtensibleIdentityHeaders, error) {
eih := make([][16]byte, a.length)
current := a.length - 1
currentPskHash := a.pskHash
for {
identityKeyBuf := buf.New()
identityKey := identityKeyBuf.Extend(int32(method.GetSessionSubKeyAndSaltLength()))
err := derivation.GetIdentitySubKey(a.ipsk[current], salt, identityKey)
if err != nil {
return nil, newError("failed to get identity sub key").Base(err)
if mask == nil {
err := derivation.GetIdentitySubKey(a.ipsk[current], salt, identityKey)
if err != nil {
return nil, newError("failed to get identity sub key").Base(err)
}
} else {
copy(identityKey, a.ipsk[current])
}
eih[current] = [16]byte{}
err = method.GenerateEIH(identityKey, currentPskHash[:], eih[current][:])
if mask != nil {
subtle.XORBytes(currentPskHash[:], mask, currentPskHash[:])
}
err := method.GenerateEIH(identityKey, currentPskHash[:], eih[current][:])
if err != nil {
return nil, newError("failed to generate EIH").Base(err)
}

View File

@ -36,3 +36,37 @@ func (A AES128GCMMethod) GenerateEIH(CurrentIdentitySubKey []byte, nextPskHash [
aesCipher.Encrypt(out, nextPskHash)
return nil
}
func (A AES128GCMMethod) GetUDPClientProcessor(ipsk [][]byte, psk []byte, derivation KeyDerivation) (UDPClientPacketProcessor, error) {
reqSeparateHeaderPsk := psk
if ipsk != nil {
reqSeparateHeaderPsk = ipsk[0]
}
reqSeparateHeaderCipher, err := aes.NewCipher(reqSeparateHeaderPsk)
if err != nil {
return nil, newError("failed to create AES cipher").Base(err)
}
respSeparateHeaderCipher, err := aes.NewCipher(psk)
if err != nil {
return nil, newError("failed to create AES cipher").Base(err)
}
getPacketAEAD := func(sessionID []byte) cipher.AEAD {
sessionKey := make([]byte, A.GetSessionSubKeyAndSaltLength())
derivation.GetSessionSubKey(psk, sessionID, sessionKey)
block, err := aes.NewCipher(sessionKey)
aead, err := cipher.NewGCM(block)
if err != nil {
panic(err)
}
return aead
}
eihGenerator := newAESEIHGeneratorContainer(len(ipsk), psk, ipsk)
getEIH := func(mask []byte) ExtensibleIdentityHeaders {
eih, err := eihGenerator.GenerateEIHUDP(derivation, A, mask)
if err != nil {
newError("failed to generate EIH").Base(err).WriteToLog()
}
return eih
}
return NewAESUDPClientPacketProcessor(reqSeparateHeaderCipher, respSeparateHeaderCipher, getPacketAEAD, getEIH), nil
}

View File

@ -36,3 +36,37 @@ func (A AES256GCMMethod) GenerateEIH(CurrentIdentitySubKey []byte, nextPskHash [
aesCipher.Encrypt(out, nextPskHash)
return nil
}
func (A AES256GCMMethod) GetUDPClientProcessor(ipsk [][]byte, psk []byte, derivation KeyDerivation) (UDPClientPacketProcessor, error) {
reqSeparateHeaderPsk := psk
if ipsk != nil {
reqSeparateHeaderPsk = ipsk[0]
}
reqSeparateHeaderCipher, err := aes.NewCipher(reqSeparateHeaderPsk)
if err != nil {
return nil, newError("failed to create AES cipher").Base(err)
}
respSeparateHeaderCipher, err := aes.NewCipher(psk)
if err != nil {
return nil, newError("failed to create AES cipher").Base(err)
}
getPacketAEAD := func(sessionID []byte) cipher.AEAD {
sessionKey := make([]byte, A.GetSessionSubKeyAndSaltLength())
derivation.GetSessionSubKey(psk, sessionID, sessionKey)
block, err := aes.NewCipher(sessionKey)
aead, err := cipher.NewGCM(block)
if err != nil {
panic(err)
}
return aead
}
eihGenerator := newAESEIHGeneratorContainer(len(ipsk), psk, ipsk)
getEIH := func(mask []byte) ExtensibleIdentityHeaders {
eih, err := eihGenerator.GenerateEIHUDP(derivation, A, mask)
if err != nil {
newError("failed to generate EIH").Base(err).WriteToLog()
}
return eih
}
return NewAESUDPClientPacketProcessor(reqSeparateHeaderCipher, respSeparateHeaderCipher, getPacketAEAD, getEIH), nil
}

View File

@ -3,6 +3,7 @@ package shadowsocks2022
import (
"crypto/cipher"
"github.com/lunixbochs/struc"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/protocol"
"io"
@ -19,6 +20,7 @@ type Method interface {
GetSessionSubKeyAndSaltLength() int
GetStreamAEAD(SessionSubKey []byte) (cipher.AEAD, error)
GenerateEIH(CurrentIdentitySubKey []byte, nextPskHash []byte, out []byte) error
GetUDPClientProcessor(ipsk [][]byte, psk []byte, derivation KeyDerivation) (UDPClientPacketProcessor, error)
}
type ExtensibleIdentityHeaders interface {
@ -86,3 +88,27 @@ var addrParser = protocol.NewAddressParser(
protocol.AddressFamilyByte(0x04, net.AddressFamilyIPv6),
protocol.AddressFamilyByte(0x03, net.AddressFamilyDomain),
)
type UDPRequest struct {
SessionID [8]byte
PacketID uint64
TimeStamp uint64
Address DestinationAddress
Port int
Payload *buf.Buffer
}
type UDPResponse struct {
UDPRequest
ClientSessionID [8]byte
}
const UDPHeaderTypeClientToServerStream = byte(0x00)
const UDPHeaderTypeServerToClientStream = byte(0x01)
// UDPClientPacketProcessor
// Caller retain and receive all ownership of the buffer
type UDPClientPacketProcessor interface {
EncodeUDPRequest(request *UDPRequest, out *buf.Buffer) error
DecodeUDPResp(input []byte, resp *UDPResponse) error
}

View File

@ -0,0 +1,157 @@
package shadowsocks2022
import (
"bytes"
"crypto/cipher"
"github.com/lunixbochs/struc"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/net"
"io"
)
type AESUDPClientPacketProcessor struct {
requestSeparateHeaderBlockCipher cipher.Block
responseSeparateHeaderBlockCipher cipher.Block
mainPacketAEAD func([]byte) cipher.AEAD
EIHGenerator func([]byte) ExtensibleIdentityHeaders
}
func NewAESUDPClientPacketProcessor(requestSeparateHeaderBlockCipher, responseSeparateHeaderBlockCipher cipher.Block, mainPacketAEAD func([]byte) cipher.AEAD, eih func([]byte) ExtensibleIdentityHeaders) *AESUDPClientPacketProcessor {
return &AESUDPClientPacketProcessor{
requestSeparateHeaderBlockCipher: requestSeparateHeaderBlockCipher,
responseSeparateHeaderBlockCipher: responseSeparateHeaderBlockCipher,
mainPacketAEAD: mainPacketAEAD,
EIHGenerator: eih,
}
}
type separateHeader struct {
SessionID [8]byte
PacketID uint64
}
type header struct {
Type byte
TimeStamp uint64
PaddingLength uint16 `struc:"sizeof=Padding"`
Padding []byte
}
type respHeader struct {
Type byte
TimeStamp uint64
ClientSessionID [8]byte
PaddingLength uint16 `struc:"sizeof=Padding"`
Padding []byte
}
func (p *AESUDPClientPacketProcessor) EncodeUDPRequest(request *UDPRequest, out *buf.Buffer) error {
separateHeaderStruct := separateHeader{PacketID: request.PacketID, SessionID: request.SessionID}
separateHeaderBuffer := buf.New()
defer separateHeaderBuffer.Release()
{
err := struc.Pack(separateHeaderBuffer, &separateHeaderStruct)
if err != nil {
return newError("failed to pack separateHeader").Base(err)
}
}
separateHeaderBufferBytes := separateHeaderBuffer.Bytes()
{
encryptedDest := out.Extend(16)
p.requestSeparateHeaderBlockCipher.Encrypt(encryptedDest, separateHeaderBufferBytes)
}
if p.EIHGenerator != nil {
eih := p.EIHGenerator(separateHeaderBufferBytes[0:16])
eihHeader := struct {
EIH ExtensibleIdentityHeaders
}{
EIH: eih,
}
err := struc.Pack(out, &eihHeader)
if err != nil {
return newError("failed to pack eih").Base(err)
}
}
headerStruct := header{
Type: UDPHeaderTypeClientToServerStream,
TimeStamp: request.TimeStamp,
PaddingLength: 0,
Padding: nil,
}
requestBodyBuffer := buf.New()
{
err := struc.Pack(requestBodyBuffer, &headerStruct)
if err != nil {
return newError("failed to header").Base(err)
}
}
{
err := addrParser.WriteAddressPort(requestBodyBuffer, request.Address, net.Port(request.Port))
if err != nil {
return newError("failed to write address port").Base(err)
}
}
{
_, err := io.Copy(requestBodyBuffer, bytes.NewReader(request.Payload.Bytes()))
if err != nil {
return newError("failed to copy payload").Base(err)
}
}
{
mainPacketAEADMaterialized := p.mainPacketAEAD(separateHeaderBufferBytes[0:8])
encryptedDest := out.Extend(int32(mainPacketAEADMaterialized.Overhead()) + requestBodyBuffer.Len())
mainPacketAEADMaterialized.Seal(encryptedDest[:0], separateHeaderBuffer.Bytes()[4:16], requestBodyBuffer.Bytes(), nil)
}
return nil
}
func (p *AESUDPClientPacketProcessor) DecodeUDPResp(input []byte, resp *UDPResponse) error {
separateHeaderBuffer := buf.New()
defer separateHeaderBuffer.Release()
{
encryptedDest := separateHeaderBuffer.Extend(16)
p.responseSeparateHeaderBlockCipher.Decrypt(encryptedDest, input)
}
separateHeaderStruct := separateHeader{}
{
err := struc.Unpack(separateHeaderBuffer, &separateHeaderStruct)
if err != nil {
return newError("failed to unpack separateHeader").Base(err)
}
}
resp.PacketID = separateHeaderStruct.PacketID
resp.SessionID = separateHeaderStruct.SessionID
{
mainPacketAEADMaterialized := p.mainPacketAEAD(separateHeaderBuffer.Bytes()[0:8])
decryptedDestBuffer := buf.New()
decryptedDest := decryptedDestBuffer.Extend(int32(len(input)) - 16 - int32(mainPacketAEADMaterialized.Overhead()))
_, err := mainPacketAEADMaterialized.Open(decryptedDest[:0], separateHeaderBuffer.Bytes()[4:16], input[16:], nil)
if err != nil {
return newError("failed to open main packet").Base(err)
}
decryptedDestReader := bytes.NewReader(decryptedDest)
headerStruct := respHeader{}
{
err := struc.Unpack(decryptedDestReader, &headerStruct)
if err != nil {
return newError("failed to unpack header").Base(err)
}
}
resp.TimeStamp = headerStruct.TimeStamp
var addressReaderBuf = buf.New()
defer addressReaderBuf.Release()
var port net.Port
resp.Address, port, err = addrParser.ReadAddressPort(addressReaderBuf, decryptedDestReader)
if err != nil {
return newError("failed to read address port").Base(err)
}
resp.Port = int(port)
readedLength := decryptedDestReader.Size() - int64(decryptedDestReader.Len())
decryptedDestBuffer.Advance(int32(readedLength))
resp.Payload = decryptedDestBuffer
resp.ClientSessionID = headerStruct.ClientSessionID
return nil
}
}

View File

@ -0,0 +1,51 @@
package udp
import (
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/transport/internet"
)
func NewMonoDestUDPConn(conn internet.AbstractPacketConn, addr net.Addr) *MonoDestUDPConn {
return &MonoDestUDPConn{
AbstractPacketConn: conn,
dest: addr,
}
}
type MonoDestUDPConn struct {
internet.AbstractPacketConn
dest net.Addr
}
func (m *MonoDestUDPConn) ReadMultiBuffer() (buf.MultiBuffer, error) {
buffer := buf.New()
buffer.Extend(2048)
nBytes, _, err := m.AbstractPacketConn.ReadFrom(buffer.Bytes())
if err != nil {
buffer.Release()
return nil, err
}
buffer.Resize(0, int32(nBytes))
return buf.MultiBuffer{buffer}, nil
}
func (m *MonoDestUDPConn) WriteMultiBuffer(buffer buf.MultiBuffer) error {
for _, b := range buffer {
_, err := m.AbstractPacketConn.WriteTo(b.Bytes(), m.dest)
if err != nil {
return err
}
}
buf.ReleaseMulti(buffer)
return nil
}
func (m *MonoDestUDPConn) Read(p []byte) (n int, err error) {
n, _, err = m.AbstractPacketConn.ReadFrom(p)
return
}
func (m *MonoDestUDPConn) Write(p []byte) (n int, err error) {
return m.AbstractPacketConn.WriteTo(p, m.dest)
}