1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-22 10:08:15 -05:00

apply contex in udp hub

This commit is contained in:
Darien Raymond 2017-02-13 22:39:50 +01:00
parent 10acab0dfe
commit 5040817a7b
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169

View File

@ -1,14 +1,13 @@
package udp package udp
import ( import (
"context"
"net" "net"
"sync"
"v2ray.com/core/app/log" "v2ray.com/core/app/log"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/dice" "v2ray.com/core/common/dice"
v2net "v2ray.com/core/common/net" v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/signal"
"v2ray.com/core/transport/internet/internal" "v2ray.com/core/transport/internet/internal"
) )
@ -76,9 +75,8 @@ type ListenOption struct {
} }
type Hub struct { type Hub struct {
sync.RWMutex
conn *net.UDPConn conn *net.UDPConn
cancel *signal.CancelSignal cancel context.CancelFunc
queue *PayloadQueue queue *PayloadQueue
option ListenOption option ListenOption
} }
@ -107,24 +105,20 @@ func ListenUDP(address v2net.Address, port v2net.Port, option ListenOption) (*Hu
return nil, err return nil, err
} }
} }
ctx, cancel := context.WithCancel(context.Background())
hub := &Hub{ hub := &Hub{
conn: udpConn, conn: udpConn,
queue: NewPayloadQueue(option), queue: NewPayloadQueue(option),
option: option, option: option,
cancel: signal.NewCloseSignal(), cancel: cancel,
} }
go hub.start() go hub.start(ctx)
return hub, nil return hub, nil
} }
func (v *Hub) Close() { func (v *Hub) Close() {
v.Lock() v.cancel()
defer v.Unlock()
v.cancel.Cancel()
v.conn.Close() v.conn.Close()
v.cancel.WaitForDone()
v.queue.Close()
} }
func (v *Hub) WriteTo(payload []byte, dest v2net.Destination) (int, error) { func (v *Hub) WriteTo(payload []byte, dest v2net.Destination) (int, error) {
@ -134,12 +128,9 @@ func (v *Hub) WriteTo(payload []byte, dest v2net.Destination) (int, error) {
}) })
} }
func (v *Hub) start() { func (v *Hub) start(ctx context.Context) {
v.cancel.WaitThread()
defer v.cancel.FinishThread()
oobBytes := make([]byte, 256) oobBytes := make([]byte, 256)
for v.Running() { for range ctx.Done() {
buffer := buf.NewSmall() buffer := buf.NewSmall()
var noob int var noob int
var addr *net.UDPAddr var addr *net.UDPAddr
@ -165,13 +156,10 @@ func (v *Hub) start() {
} }
v.queue.Enqueue(payload) v.queue.Enqueue(payload)
} }
v.queue.Close()
} }
func (v *Hub) Running() bool { // Connection returns the net.Conn underneath this hub.
return !v.cancel.Cancelled()
}
// Connection return the net.Conn underneath this hub.
// Private: Visible for testing only // Private: Visible for testing only
func (v *Hub) Connection() net.Conn { func (v *Hub) Connection() net.Conn {
return v.conn return v.conn