diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 84a0c4ee3..a238047f4 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -2,7 +2,6 @@ package dokodemo import ( "io" - "net" "sync" "github.com/v2ray/v2ray-core/app" @@ -22,7 +21,7 @@ type DokodemoDoor struct { port v2net.Port space app.Space tcpListener *hub.TCPListener - udpConn *net.UDPConn + udpHub *hub.UDPHub listeningPort v2net.Port } @@ -47,10 +46,10 @@ func (this *DokodemoDoor) Close() { this.tcpListener = nil this.tcpMutex.Unlock() } - if this.udpConn != nil { - this.udpConn.Close() + if this.udpHub != nil { this.udpMutex.Lock() - this.udpConn = nil + this.udpHub.Close() + this.udpHub = nil this.udpMutex.Unlock() } } @@ -82,52 +81,32 @@ func (this *DokodemoDoor) Listen(port v2net.Port) error { } func (this *DokodemoDoor) ListenUDP(port v2net.Port) error { - udpConn, err := net.ListenUDP("udp", &net.UDPAddr{ - IP: []byte{0, 0, 0, 0}, - Port: int(port), - Zone: "", - }) + udpHub, err := hub.ListenUDP(port, this.handleUDPPackets) if err != nil { log.Error("Dokodemo failed to listen on port ", port, ": ", err) return err } this.udpMutex.Lock() - this.udpConn = udpConn + this.udpHub = udpHub this.udpMutex.Unlock() - go this.handleUDPPackets() return nil } -func (this *DokodemoDoor) handleUDPPackets() { - for this.accepting { - buffer := alloc.NewBuffer() +func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) { + packet := v2net.NewPacket(v2net.UDPDestination(this.address, this.port), payload, false) + ray := this.space.PacketDispatcher().DispatchToOutbound(packet) + close(ray.InboundInput()) + + for resp := range ray.InboundOutput() { this.udpMutex.RLock() if !this.accepting { this.udpMutex.RUnlock() + resp.Release() return } - nBytes, addr, err := this.udpConn.ReadFromUDP(buffer.Value) + this.udpHub.WriteTo(resp.Value, dest) this.udpMutex.RUnlock() - buffer.Slice(0, nBytes) - if err != nil { - buffer.Release() - log.Error("Dokodemo failed to read from UDP: ", err) - return - } - - packet := v2net.NewPacket(v2net.UDPDestination(this.address, this.port), buffer, false) - ray := this.space.PacketDispatcher().DispatchToOutbound(packet) - close(ray.InboundInput()) - - for payload := range ray.InboundOutput() { - this.udpMutex.RLock() - if !this.accepting { - this.udpMutex.RUnlock() - return - } - this.udpConn.WriteToUDP(payload.Value, addr) - this.udpMutex.RUnlock() - } + resp.Release() } } diff --git a/transport/hub/udp.go b/transport/hub/udp.go new file mode 100644 index 000000000..de519fec0 --- /dev/null +++ b/transport/hub/udp.go @@ -0,0 +1,59 @@ +package hub + +import ( + "net" + + "github.com/v2ray/v2ray-core/common/alloc" + v2net "github.com/v2ray/v2ray-core/common/net" +) + +type UDPPayloadHandler func(*alloc.Buffer, v2net.Destination) + +type UDPHub struct { + conn *net.UDPConn + callback UDPPayloadHandler + accepting bool +} + +func ListenUDP(port v2net.Port, callback UDPPayloadHandler) (*UDPHub, error) { + udpConn, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: int(port), + }) + if err != nil { + return nil, err + } + hub := &UDPHub{ + conn: udpConn, + callback: callback, + } + go hub.start() + return hub, nil +} + +func (this *UDPHub) Close() { + this.accepting = false + this.conn.Close() +} + +func (this *UDPHub) WriteTo(payload []byte, dest v2net.Destination) (int, error) { + return this.conn.WriteToUDP(payload, &net.UDPAddr{ + IP: dest.Address().IP(), + Port: int(dest.Port()), + }) +} + +func (this *UDPHub) start() { + this.accepting = true + for this.accepting { + buffer := alloc.NewBuffer() + nBytes, addr, err := this.conn.ReadFromUDP(buffer.Value) + if err != nil { + buffer.Release() + continue + } + buffer.Slice(0, nBytes) + dest := v2net.UDPDestination(v2net.IPAddress(addr.IP), v2net.Port(addr.Port)) + go this.callback(buffer, dest) + } +}