From e7aa920c27e686986ff8bd950634df9a2d685596 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Mon, 30 Jan 2017 21:35:34 +0100 Subject: [PATCH] refine dyn port --- app/proxyman/inbound/dynamic.go | 77 ++++++++++++++++----------------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/app/proxyman/inbound/dynamic.go b/app/proxyman/inbound/dynamic.go index 3f9928043..351fc08ff 100644 --- a/app/proxyman/inbound/dynamic.go +++ b/app/proxyman/inbound/dynamic.go @@ -12,27 +12,16 @@ import ( "v2ray.com/core/proxy" ) -type workerWithContext struct { - ctx context.Context - cancel context.CancelFunc - worker worker -} - -func (w *workerWithContext) Close() { - w.cancel() - w.worker.Close() -} - type DynamicInboundHandler struct { - sync.Mutex tag string ctx context.Context cancel context.CancelFunc proxyConfig interface{} receiverConfig *proxyman.ReceiverConfig + portMutex sync.Mutex portsInUse map[v2net.Port]bool - worker []*workerWithContext - worker2Recycle []*workerWithContext + workerMutex sync.RWMutex + worker []worker lastRefresh time.Time } @@ -53,8 +42,9 @@ func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *p func (h *DynamicInboundHandler) allocatePort() v2net.Port { from := int(h.receiverConfig.PortRange.From) delta := int(h.receiverConfig.PortRange.To) - from + 1 - h.Lock() - defer h.Unlock() + + h.portMutex.Lock() + defer h.portMutex.Unlock() for { r := dice.Roll(delta) @@ -67,30 +57,34 @@ func (h *DynamicInboundHandler) allocatePort() v2net.Port { } } -func (h *DynamicInboundHandler) refresh() error { - h.lastRefresh = time.Now() - - ports2Del := make([]v2net.Port, 0, 16) - for _, worker := range h.worker2Recycle { +func (h *DynamicInboundHandler) waitAnyCloseWorkers(ctx context.Context, cancel context.CancelFunc, workers []worker, duration time.Duration) { + time.Sleep(duration) + cancel() + ports2Del := make([]v2net.Port, len(workers)) + for idx, worker := range workers { + ports2Del[idx] = worker.Port() worker.Close() - ports2Del = append(ports2Del, worker.worker.Port()) } - h.Lock() + h.portMutex.Lock() for _, port := range ports2Del { delete(h.portsInUse, port) } - h.Unlock() + h.portMutex.Unlock() +} - h.worker2Recycle, h.worker = h.worker, h.worker2Recycle[:0] +func (h *DynamicInboundHandler) refresh() error { + h.lastRefresh = time.Now() + + timeout := time.Minute * time.Duration(h.receiverConfig.AllocationStrategy.GetRefreshValue()) + ctx, cancel := context.WithTimeout(h.ctx, timeout) + workers := make([]worker, 0, h.receiverConfig.AllocationStrategy.GetConcurrencyValue()) address := h.receiverConfig.Listen.AsAddress() if address == nil { address = v2net.AnyIP } for i := uint32(0); i < h.receiverConfig.AllocationStrategy.GetConcurrencyValue(); i++ { - ctx, cancel := context.WithCancel(h.ctx) - port := h.allocatePort() p, err := proxy.CreateInboundHandler(ctx, h.proxyConfig) if err != nil { @@ -109,13 +103,10 @@ func (h *DynamicInboundHandler) refresh() error { allowPassiveConn: h.receiverConfig.AllowPassiveConnection, } if err := worker.Start(); err != nil { - return err + log.Warning("Proxyman:InboundHandler: Failed to create TCP worker: ", err) + continue } - h.worker = append(h.worker, &workerWithContext{ - ctx: ctx, - cancel: cancel, - worker: worker, - }) + workers = append(workers, worker) } if nl.HasNetwork(v2net.Network_UDP) { @@ -127,16 +118,19 @@ func (h *DynamicInboundHandler) refresh() error { recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, } if err := worker.Start(); err != nil { - return err + log.Warning("Proxyman:InboundHandler: Failed to create UDP worker: ", err) + continue } - h.worker = append(h.worker, &workerWithContext{ - ctx: ctx, - cancel: cancel, - worker: worker, - }) + workers = append(workers, worker) } } + h.workerMutex.Lock() + h.worker = workers + h.workerMutex.Unlock() + + go h.waitAnyCloseWorkers(ctx, cancel, workers, timeout) + return nil } @@ -162,7 +156,10 @@ func (h *DynamicInboundHandler) Close() { } func (h *DynamicInboundHandler) GetRandomInboundProxy() (proxy.Inbound, v2net.Port, int) { + h.workerMutex.RLock() + defer h.workerMutex.RUnlock() + w := h.worker[dice.Roll(len(h.worker))] expire := h.receiverConfig.AllocationStrategy.GetRefreshValue() - uint32(time.Since(h.lastRefresh)/time.Minute) - return w.worker.Proxy(), w.worker.Port(), int(expire) + return w.Proxy(), w.Port(), int(expire) }