From 210a32dc121abd71e6cfea71887cac251b05f038 Mon Sep 17 00:00:00 2001 From: v2ray Date: Mon, 15 Aug 2016 17:44:46 +0200 Subject: [PATCH] support udp redirection --- proxy/dokodemo/dokodemo.go | 14 +++++++--- proxy/shadowsocks/server.go | 5 ++-- proxy/socks/server_udp.go | 5 ++-- transport/internet/internal/sysfd.go | 25 +++++++++++++++++ transport/internet/kcp/listener.go | 7 +++-- transport/internet/tcp/connection.go | 24 ++-------------- transport/internet/udp/hub_linux.go | 34 ++++++++++++++++++++++ transport/internet/udp/hub_other.go | 15 ++++++++++ transport/internet/udp/udp.go | 42 ++++++++++++++++++++++------ 9 files changed, 132 insertions(+), 39 deletions(-) create mode 100644 transport/internet/internal/sysfd.go create mode 100644 transport/internet/udp/hub_linux.go create mode 100644 transport/internet/udp/hub_other.go diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 7d38d9f51..3b4ccfe50 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -90,7 +90,7 @@ func (this *DokodemoDoor) Start() error { func (this *DokodemoDoor) ListenUDP() error { this.udpServer = udp.NewUDPServer(this.meta, this.packetDispatcher) - udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPackets) + udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, udp.ListenOption{Callback: this.handleUDPPackets}) if err != nil { log.Error("Dokodemo failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -101,9 +101,15 @@ func (this *DokodemoDoor) ListenUDP() error { return nil } -func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) { - this.udpServer.Dispatch( - &proxy.SessionInfo{Source: dest, Destination: v2net.UDPDestination(this.address, this.port)}, payload, this.handleUDPResponse) +func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, session *proxy.SessionInfo) { + if session.Destination == nil && this.address != nil && this.port > 0 { + session.Destination = v2net.UDPDestination(this.address, this.port) + } + if session.Destination == nil { + log.Info("Dokodemo: Unknown destination, stop forwarding...") + return + } + this.udpServer.Dispatch(session, payload, this.handleUDPResponse) } func (this *DokodemoDoor) handleUDPResponse(dest v2net.Destination, payload *alloc.Buffer) { diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 12cace63b..cb5ef63b2 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -71,7 +71,7 @@ func (this *Server) Start() error { if this.config.UDP { this.udpServer = udp.NewUDPServer(this.meta, this.packetDispatcher) - udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handlerUDPPayload) + udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, udp.ListenOption{Callback: this.handlerUDPPayload}) if err != nil { log.Error("Shadowsocks: Failed to listen UDP on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -84,9 +84,10 @@ func (this *Server) Start() error { return nil } -func (this *Server) handlerUDPPayload(payload *alloc.Buffer, source v2net.Destination) { +func (this *Server) handlerUDPPayload(payload *alloc.Buffer, session *proxy.SessionInfo) { defer payload.Release() + source := session.Source ivLen := this.config.Cipher.IVSize() iv := payload.Value[:ivLen] key := this.config.Key diff --git a/proxy/socks/server_udp.go b/proxy/socks/server_udp.go index d25eaf9dc..f02dff113 100644 --- a/proxy/socks/server_udp.go +++ b/proxy/socks/server_udp.go @@ -11,7 +11,7 @@ import ( func (this *Server) listenUDP() error { this.udpServer = udp.NewUDPServer(this.meta, this.packetDispatcher) - udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPayload) + udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, udp.ListenOption{Callback: this.handleUDPPayload}) if err != nil { log.Error("Socks: Failed to listen on udp ", this.meta.Address, ":", this.meta.Port) return err @@ -23,7 +23,8 @@ func (this *Server) listenUDP() error { return nil } -func (this *Server) handleUDPPayload(payload *alloc.Buffer, source v2net.Destination) { +func (this *Server) handleUDPPayload(payload *alloc.Buffer, session *proxy.SessionInfo) { + source := session.Source log.Info("Socks: Client UDP connection from ", source) request, err := protocol.ReadUDPRequest(payload.Value) payload.Release() diff --git a/transport/internet/internal/sysfd.go b/transport/internet/internal/sysfd.go new file mode 100644 index 000000000..d104f6661 --- /dev/null +++ b/transport/internet/internal/sysfd.go @@ -0,0 +1,25 @@ +package internal + +import ( + "errors" + "net" + "reflect" +) + +var ( + ErrInvalidConn = errors.New("Invalid Connection.") +) + +func GetSysFd(conn net.Conn) (int, error) { + cv := reflect.ValueOf(conn) + switch ce := cv.Elem(); ce.Kind() { + case reflect.Struct: + netfd := ce.FieldByName("conn").FieldByName("fd") + switch fe := netfd.Elem(); fe.Kind() { + case reflect.Struct: + fd := fe.FieldByName("sysfd") + return int(fd.Int()), nil + } + } + return 0, ErrInvalidConn +} diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go index c7948bbca..d0aa1163f 100644 --- a/transport/internet/kcp/listener.go +++ b/transport/internet/kcp/listener.go @@ -9,6 +9,7 @@ import ( "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/common/serial" + "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/transport/internet" "github.com/v2ray/v2ray-core/transport/internet/udp" ) @@ -39,7 +40,7 @@ func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) { }, running: true, } - hub, err := udp.ListenUDP(address, port, l.OnReceive) + hub, err := udp.ListenUDP(address, port, udp.ListenOption{Callback: l.OnReceive}) if err != nil { return nil, err } @@ -48,9 +49,11 @@ func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) { return l, nil } -func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) { +func (this *Listener) OnReceive(payload *alloc.Buffer, session *proxy.SessionInfo) { defer payload.Release() + src := session.Source + if valid := this.authenticator.Open(payload); !valid { log.Info("KCP|Listener: discarding invalid payload from ", src) return diff --git a/transport/internet/tcp/connection.go b/transport/internet/tcp/connection.go index 0cf0905b0..2dda79dd6 100644 --- a/transport/internet/tcp/connection.go +++ b/transport/internet/tcp/connection.go @@ -1,15 +1,11 @@ package tcp import ( - "errors" "io" "net" - "reflect" "time" -) -var ( - ErrInvalidConn = errors.New("Invalid Connection.") + "github.com/v2ray/v2ray-core/transport/internet/internal" ) type ConnectionManager interface { @@ -27,7 +23,7 @@ func (this *RawConnection) Reusable() bool { func (this *RawConnection) SetReusable(b bool) {} func (this *RawConnection) SysFd() (int, error) { - return getSysFd(&this.TCPConn) + return internal.GetSysFd(&this.TCPConn) } type Connection struct { @@ -106,19 +102,5 @@ func (this *Connection) Reusable() bool { } func (this *Connection) SysFd() (int, error) { - return getSysFd(this.conn) -} - -func getSysFd(conn net.Conn) (int, error) { - cv := reflect.ValueOf(conn) - switch ce := cv.Elem(); ce.Kind() { - case reflect.Struct: - netfd := ce.FieldByName("conn").FieldByName("fd") - switch fe := netfd.Elem(); fe.Kind() { - case reflect.Struct: - fd := fe.FieldByName("sysfd") - return int(fd.Int()), nil - } - } - return 0, ErrInvalidConn + return internal.GetSysFd(this.conn) } diff --git a/transport/internet/udp/hub_linux.go b/transport/internet/udp/hub_linux.go new file mode 100644 index 000000000..1d9d5b6b8 --- /dev/null +++ b/transport/internet/udp/hub_linux.go @@ -0,0 +1,34 @@ +// +build linux + +package udp + +import ( + "syscall" + + v2net "github.com/v2ray/v2ray-core/common/net" +) + +func SetOriginalDestOptions(fd int) error { + if err := syscall.SetsockoptInt(fd, syscall.SOL_IP, syscall.IP_TRANSPARENT, 1); err != nil { + return err + } + if err := syscall.SetsockoptInt(fd, syscall.SOL_IP, syscall.IP_RECVORIGDSTADDR, 1); err != nil { + return err + } + return nil +} + +func RetrieveOriginalDest(oob []byte) v2net.Destination { + msgs, err := syscall.ParseSocketControlMessage(oob) + if err != nil { + return nil + } + for _, msg := range msgs { + if msg.Header.Level == syscall.SOL_IP && msg.Header.Type == syscall.IP_ORIGDSTADDR { + ip := v2net.IPAddress(msg.Data[4:8]) + port := v2net.PortFromBytes(msg.Data[2:4]) + return v2net.UDPDestination(ip, port) + } + } + return nil +} diff --git a/transport/internet/udp/hub_other.go b/transport/internet/udp/hub_other.go new file mode 100644 index 000000000..6b481f4b4 --- /dev/null +++ b/transport/internet/udp/hub_other.go @@ -0,0 +1,15 @@ +// +build !linux + +package udp + +import ( + v2net "github.com/v2ray/v2ray-core/common/net" +) + +func SetOriginalDestOptions(fd int) error { + return nil +} + +func RetrieveOriginalDest(oob []byte) v2net.Destination { + return nil +} diff --git a/transport/internet/udp/udp.go b/transport/internet/udp/udp.go index 874b8e5dd..a04db0e84 100644 --- a/transport/internet/udp/udp.go +++ b/transport/internet/udp/udp.go @@ -5,19 +5,27 @@ import ( "sync" "github.com/v2ray/v2ray-core/common/alloc" + "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport/internet/internal" ) -type UDPPayloadHandler func(*alloc.Buffer, v2net.Destination) +type UDPPayloadHandler func(*alloc.Buffer, *proxy.SessionInfo) type UDPHub struct { sync.RWMutex conn *net.UDPConn - callback UDPPayloadHandler + option ListenOption accepting bool } -func ListenUDP(address v2net.Address, port v2net.Port, callback UDPPayloadHandler) (*UDPHub, error) { +type ListenOption struct { + Callback UDPPayloadHandler + ReceiveOriginalDest bool +} + +func ListenUDP(address v2net.Address, port v2net.Port, option ListenOption) (*UDPHub, error) { udpConn, err := net.ListenUDP("udp", &net.UDPAddr{ IP: address.IP(), Port: int(port), @@ -25,9 +33,21 @@ func ListenUDP(address v2net.Address, port v2net.Port, callback UDPPayloadHandle if err != nil { return nil, err } + if option.ReceiveOriginalDest { + fd, err := internal.GetSysFd(udpConn) + if err != nil { + log.Warning("UDP|Listener: Failed to get fd: ", err) + return nil, err + } + err = SetOriginalDestOptions(fd) + if err != nil { + log.Warning("UDP|Listener: Failed to set socket options: ", err) + return nil, err + } + } hub := &UDPHub{ - conn: udpConn, - callback: callback, + conn: udpConn, + option: option, } go hub.start() return hub, nil @@ -53,16 +73,22 @@ func (this *UDPHub) start() { this.accepting = true this.Unlock() + oobBytes := make([]byte, 256) for this.Running() { buffer := alloc.NewBuffer() - nBytes, addr, err := this.conn.ReadFromUDP(buffer.Value) + nBytes, noob, _, addr, err := this.conn.ReadMsgUDP(buffer.Value, oobBytes) if err != nil { buffer.Release() continue } buffer.Slice(0, nBytes) - dest := v2net.UDPDestination(v2net.IPAddress(addr.IP), v2net.Port(addr.Port)) - go this.callback(buffer, dest) + + session := new(proxy.SessionInfo) + session.Source = v2net.UDPDestination(v2net.IPAddress(addr.IP), v2net.Port(addr.Port)) + if this.option.ReceiveOriginalDest && noob > 0 { + session.Destination = RetrieveOriginalDest(oobBytes[:noob]) + } + go this.option.Callback(buffer, session) } }