From b52725cf659e0f7a38fed2eb36a5a792843bd54f Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Sat, 5 Jan 2019 21:43:22 +0100 Subject: [PATCH] DialUDP function --- app/dns/udpns.go | 6 +- app/proxyman/inbound/worker.go | 2 +- functions.go | 15 ++ functions_test.go | 170 ++++++++++++++++++++++ proxy/shadowsocks/server.go | 4 +- proxy/socks/server.go | 4 +- transport/internet/udp/dispatcher.go | 91 +++++++++++- transport/internet/udp/dispatcher_test.go | 3 +- 8 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 functions_test.go diff --git a/app/dns/udpns.go b/app/dns/udpns.go index 2b2ba0f7d..817cd38a9 100644 --- a/app/dns/udpns.go +++ b/app/dns/udpns.go @@ -9,9 +9,9 @@ import ( "golang.org/x/net/dns/dnsmessage" "v2ray.com/core/common" - "v2ray.com/core/common/buf" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol/dns" + udp_proto "v2ray.com/core/common/protocol/udp" "v2ray.com/core/common/session" "v2ray.com/core/common/signal/pubsub" "v2ray.com/core/common/task" @@ -101,7 +101,9 @@ func (s *ClassicNameServer) Cleanup() error { return nil } -func (s *ClassicNameServer) HandleResponse(ctx context.Context, payload *buf.Buffer) { +func (s *ClassicNameServer) HandleResponse(ctx context.Context, packet *udp_proto.Packet) { + payload := packet.Payload + var parser dnsmessage.Parser header, err := parser.Start(payload.Bytes()) if err != nil { diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 21e3cb2ab..62bb559b6 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -313,7 +313,7 @@ func (w *udpWorker) removeConn(id connID) { func (w *udpWorker) handlePackets() { receive := w.hub.Receive() for payload := range receive { - w.callback(payload.Content, payload.Source, payload.OriginalDestination) + w.callback(payload.Payload, payload.Source, payload.Target) } } diff --git a/functions.go b/functions.go index 6c950102e..f8d7f000b 100644 --- a/functions.go +++ b/functions.go @@ -7,6 +7,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/net" "v2ray.com/core/features/routing" + "v2ray.com/core/transport/internet/udp" ) // CreateObject creates a new object based on the given V2Ray instance and config. The V2Ray instance may be nil. @@ -54,3 +55,17 @@ func Dial(ctx context.Context, v *Instance, dest net.Destination) (net.Conn, err } return net.NewConnection(net.ConnectionInputMulti(r.Writer), net.ConnectionOutputMulti(r.Reader)), nil } + +// DialUDP provides a way to exchange UDP packets through V2Ray instance to remote servers. +// Since it is under a proxy context, the LocalAddr() in returned PacketConn will not show the real address. +// +// TODO: SetDeadline() / SetReadDeadline() / SetWriteDeadline() are not implemented. +// +// v2ray:api:beta +func DialUDP(ctx context.Context, v *Instance) (net.PacketConn, error) { + dispatcher := v.GetFeature(routing.DispatcherType()) + if dispatcher == nil { + return nil, newError("routing.Dispatcher is not registered in V2Ray core") + } + return udp.DialDispatcher(ctx, dispatcher.(routing.Dispatcher)) +} diff --git a/functions_test.go b/functions_test.go new file mode 100644 index 000000000..b5a25ed32 --- /dev/null +++ b/functions_test.go @@ -0,0 +1,170 @@ +package core_test + +import ( + "context" + "crypto/rand" + "io" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/google/go-cmp/cmp" + + "v2ray.com/core" + "v2ray.com/core/app/dispatcher" + "v2ray.com/core/app/proxyman" + "v2ray.com/core/common" + "v2ray.com/core/common/net" + "v2ray.com/core/common/serial" + "v2ray.com/core/proxy/freedom" + "v2ray.com/core/testing/servers/tcp" + "v2ray.com/core/testing/servers/udp" +) + +func xor(b []byte) []byte { + r := make([]byte, len(b)) + for i, v := range b { + r[i] = v ^ 'c' + } + return r +} + +func xor2(b []byte) []byte { + r := make([]byte, len(b)) + for i, v := range b { + r[i] = v ^ 'd' + } + return r +} + +func TestV2RayDial(t *testing.T) { + tcpServer := tcp.Server{ + MsgProcessor: xor, + } + dest, err := tcpServer.Start() + common.Must(err) + defer tcpServer.Close() + + config := &core.Config{ + App: []*serial.TypedMessage{ + serial.ToTypedMessage(&dispatcher.Config{}), + serial.ToTypedMessage(&proxyman.InboundConfig{}), + serial.ToTypedMessage(&proxyman.OutboundConfig{}), + }, + Outbound: []*core.OutboundHandlerConfig{ + { + ProxySettings: serial.ToTypedMessage(&freedom.Config{}), + }, + }, + } + + cfgBytes, err := proto.Marshal(config) + common.Must(err) + + server, err := core.StartInstance("protobuf", cfgBytes) + common.Must(err) + defer server.Close() + + conn, err := core.Dial(context.Background(), server, dest) + common.Must(err) + defer conn.Close() + + const size = 10240 * 1024 + payload := make([]byte, size) + common.Must2(rand.Read(payload)) + + if _, err := conn.Write(payload); err != nil { + t.Fatal(err) + } + + receive := make([]byte, size) + if _, err := io.ReadFull(conn, receive); err != nil { + t.Fatal("failed to read all response: ", err) + } + + if r := cmp.Diff(xor(receive), payload); r != "" { + t.Error(r) + } +} + +func TestV2RayDialUDP(t *testing.T) { + udpServer1 := udp.Server{ + MsgProcessor: xor, + } + dest1, err := udpServer1.Start() + common.Must(err) + defer udpServer1.Close() + + udpServer2 := udp.Server{ + MsgProcessor: xor2, + } + dest2, err := udpServer2.Start() + common.Must(err) + defer udpServer2.Close() + + config := &core.Config{ + App: []*serial.TypedMessage{ + serial.ToTypedMessage(&dispatcher.Config{}), + serial.ToTypedMessage(&proxyman.InboundConfig{}), + serial.ToTypedMessage(&proxyman.OutboundConfig{}), + }, + Outbound: []*core.OutboundHandlerConfig{ + { + ProxySettings: serial.ToTypedMessage(&freedom.Config{}), + }, + }, + } + + cfgBytes, err := proto.Marshal(config) + common.Must(err) + + server, err := core.StartInstance("protobuf", cfgBytes) + common.Must(err) + defer server.Close() + + conn, err := core.DialUDP(context.Background(), server) + common.Must(err) + defer conn.Close() + + const size = 1024 + { + payload := make([]byte, size) + common.Must2(rand.Read(payload)) + + if _, err := conn.WriteTo(payload, &net.UDPAddr{ + IP: dest1.Address.IP(), + Port: int(dest1.Port), + }); err != nil { + t.Fatal(err) + } + + receive := make([]byte, size) + if _, _, err := conn.ReadFrom(receive); err != nil { + t.Fatal(err) + } + + if r := cmp.Diff(xor(receive), payload); r != "" { + t.Error(r) + } + } + + { + payload := make([]byte, size) + common.Must2(rand.Read(payload)) + + if _, err := conn.WriteTo(payload, &net.UDPAddr{ + IP: dest2.Address.IP(), + Port: int(dest2.Port), + }); err != nil { + t.Fatal(err) + } + + receive := make([]byte, size) + if _, _, err := conn.ReadFrom(receive); err != nil { + t.Fatal(err) + } + + if r := cmp.Diff(xor2(receive), payload); r != "" { + t.Error(r) + } + } +} diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 44c6d7966..8de8e2800 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -10,6 +10,7 @@ import ( "v2ray.com/core/common/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + udp_proto "v2ray.com/core/common/protocol/udp" "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" @@ -69,12 +70,13 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet } func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error { - udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) { + udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) { request := protocol.RequestHeaderFromContext(ctx) if request == nil { return } + payload := packet.Payload data, err := EncodeUDPPacket(request, payload.Bytes()) payload.Release() if err != nil { diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 26821dd6f..6e05bbf22 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -11,6 +11,7 @@ import ( "v2ray.com/core/common/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + udp_proto "v2ray.com/core/common/protocol/udp" "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" @@ -174,7 +175,8 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ } func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error { - udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) { + udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) { + payload := packet.Payload newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx)) request := protocol.RequestHeaderFromContext(ctx) diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index 792deddf0..f83bf6a6f 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -2,19 +2,23 @@ package udp import ( "context" + "io" "sync" "time" + "v2ray.com/core/common/signal/done" + "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/protocol/udp" "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/features/routing" "v2ray.com/core/transport" ) -type ResponseCallback func(ctx context.Context, payload *buf.Buffer) +type ResponseCallback func(ctx context.Context, packet *udp.Packet) type connEntry struct { link *transport.Link @@ -70,7 +74,7 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *c cancel: removeRay, } v.conns[dest] = entry - go handleInput(ctx, entry, v.callback) + go handleInput(ctx, entry, dest, v.callback) return entry } @@ -89,7 +93,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, } } -func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) { +func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback) { defer conn.cancel() input := conn.link.Reader @@ -109,7 +113,86 @@ func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback } timer.Update() for _, b := range mb { - callback(ctx, b) + callback(ctx, &udp.Packet{ + Payload: b, + Source: dest, + }) } } } + +type dispatcherConn struct { + dispatcher *Dispatcher + cache chan *udp.Packet + done *done.Instance +} + +func DialDispatcher(ctx context.Context, dispatcher routing.Dispatcher) (net.PacketConn, error) { + c := &dispatcherConn{ + cache: make(chan *udp.Packet, 16), + done: done.New(), + } + + d := NewDispatcher(dispatcher, c.callback) + c.dispatcher = d + return c, nil +} + +func (c *dispatcherConn) callback(ctx context.Context, packet *udp.Packet) { + select { + case <-c.done.Wait(): + packet.Payload.Release() + return + case c.cache <- packet: + default: + packet.Payload.Release() + return + } +} + +func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) { + select { + case <-c.done.Wait(): + return 0, nil, io.EOF + case packet := <-c.cache: + n := copy(p, packet.Payload.Bytes()) + return n, &net.UDPAddr{ + IP: packet.Source.Address.IP(), + Port: int(packet.Source.Port), + }, nil + } +} + +func (c *dispatcherConn) WriteTo(p []byte, addr net.Addr) (int, error) { + buffer := buf.New() + raw := buffer.Extend(buf.Size) + n := copy(raw, p) + buffer.Resize(0, int32(n)) + + ctx := context.Background() + c.dispatcher.Dispatch(ctx, net.DestinationFromAddr(addr), buffer) + return n, nil +} + +func (c *dispatcherConn) Close() error { + return c.done.Close() +} + +func (c *dispatcherConn) LocalAddr() net.Addr { + return &net.UDPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: 0, + } +} + +func (c *dispatcherConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *dispatcherConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *dispatcherConn) SetWriteDeadline(t time.Time) error { + return nil +} diff --git a/transport/internet/udp/dispatcher_test.go b/transport/internet/udp/dispatcher_test.go index 329b7b562..4dcd60b46 100644 --- a/transport/internet/udp/dispatcher_test.go +++ b/transport/internet/udp/dispatcher_test.go @@ -8,6 +8,7 @@ import ( "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/protocol/udp" "v2ray.com/core/features/routing" "v2ray.com/core/transport" . "v2ray.com/core/transport/internet/udp" @@ -66,7 +67,7 @@ func TestSameDestinationDispatching(t *testing.T) { b.WriteString("abcd") var msgCount uint32 - dispatcher := NewDispatcher(td, func(ctx context.Context, payload *buf.Buffer) { + dispatcher := NewDispatcher(td, func(ctx context.Context, packet *udp.Packet) { atomic.AddUint32(&msgCount, 1) })