diff --git a/.vscode/settings.json b/.vscode/settings.json index aa5c5bd95..5897b1f49 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,7 +7,8 @@ "--enable-gc", "--no-config", "--exclude=.*\\.pb\\.go", - "--disable=gas" + "--disable=gas", + "--disable=gocyclo" ], "go.formatTool": "goimports", diff --git a/app/commander/commander.go b/app/commander/commander.go index 1c7036887..20c0f02d8 100644 --- a/app/commander/commander.go +++ b/app/commander/commander.go @@ -50,7 +50,7 @@ func (c *Commander) Start() error { if err != nil { return err } - rawService, err := c.v.CreateObject(config) + rawService, err := core.CreateObject(c.v, config) if err != nil { return err } diff --git a/app/proxyman/command/command.go b/app/proxyman/command/command.go index e80f3d2c2..ca3db17f9 100755 --- a/app/proxyman/command/command.go +++ b/app/proxyman/command/command.go @@ -62,7 +62,7 @@ type handlerServer struct { } func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) { - rawHandler, err := s.s.CreateObject(request.Inbound) + rawHandler, err := core.CreateObject(s.s, request.Inbound) if err != nil { return nil, err } @@ -96,7 +96,7 @@ func (s *handlerServer) AlterInbound(ctx context.Context, request *AlterInboundR } func (s *handlerServer) AddOutbound(ctx context.Context, request *AddOutboundRequest) (*AddOutboundResponse, error) { - rawHandler, err := s.s.CreateObject(request.Outbound) + rawHandler, err := core.CreateObject(s.s, request.Outbound) if err != nil { return nil, err } diff --git a/app/proxyman/inbound/always.go b/app/proxyman/inbound/always.go index 92a2d604e..310444bda 100644 --- a/app/proxyman/inbound/always.go +++ b/app/proxyman/inbound/always.go @@ -9,6 +9,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/dice" "v2ray.com/core/common/net" + "v2ray.com/core/common/serial" "v2ray.com/core/proxy" ) @@ -113,10 +114,18 @@ func (h *AlwaysOnInboundHandler) Start() error { } func (h *AlwaysOnInboundHandler) Close() error { + var errors []interface{} for _, worker := range h.workers { - worker.Close() + if err := worker.Close(); err != nil { + errors = append(errors, err) + } + } + if err := h.mux.Close(); err != nil { + errors = append(errors, err) + } + if len(errors) > 0 { + return newError("failed to close all resources").Base(newError(serial.Concat(errors...))) } - h.mux.Close() return nil } diff --git a/app/proxyman/inbound/dynamic.go b/app/proxyman/inbound/dynamic.go index 49a4d76e7..f6bde0b6a 100644 --- a/app/proxyman/inbound/dynamic.go +++ b/app/proxyman/inbound/dynamic.go @@ -69,7 +69,9 @@ func (h *DynamicInboundHandler) closeWorkers(workers []worker) { ports2Del := make([]net.Port, len(workers)) for idx, worker := range workers { ports2Del[idx] = worker.Port() - worker.Close() + if err := worker.Close(); err != nil { + newError("failed to close worker").Base(err).WriteToLog() + } } h.portMutex.Lock() @@ -95,7 +97,7 @@ func (h *DynamicInboundHandler) refresh() error { for i := uint32(0); i < concurrency; i++ { port := h.allocatePort() - rawProxy, err := h.v.CreateObject(h.proxyConfig) + rawProxy, err := core.CreateObject(h.v, h.proxyConfig) if err != nil { newError("failed to create proxy instance").Base(err).AtWarning().WriteToLog() continue diff --git a/app/proxyman/inbound/inbound.go b/app/proxyman/inbound/inbound.go index 8ebf3134d..f251d2e9c 100644 --- a/app/proxyman/inbound/inbound.go +++ b/app/proxyman/inbound/inbound.go @@ -9,6 +9,7 @@ import ( "v2ray.com/core" "v2ray.com/core/app/proxyman" "v2ray.com/core/common" + "v2ray.com/core/common/serial" ) // Manager is to manage all inbound handlers. @@ -110,11 +111,20 @@ func (m *Manager) Close() error { m.running = false + var errors []interface{} for _, handler := range m.taggedHandlers { - handler.Close() + if err := handler.Close(); err != nil { + errors = append(errors, err) + } } for _, handler := range m.untaggedHandler { - handler.Close() + if err := handler.Close(); err != nil { + errors = append(errors, err) + } + } + + if len(errors) > 0 { + return newError("failed to close all handlers").Base(newError(serial.Concat(errors...))) } return nil diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 9cf44c4db..2e672da4f 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -12,8 +12,10 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/serial" "v2ray.com/core/common/session" "v2ray.com/core/common/signal/done" + "v2ray.com/core/common/task" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/tcp" @@ -97,9 +99,17 @@ func (w *tcpWorker) Start() error { } func (w *tcpWorker) Close() error { + var errors []interface{} if w.hub != nil { - common.Close(w.hub) - common.Close(w.proxy) + if err := common.Close(w.hub); err != nil { + errors = append(errors, err) + } + if err := common.Close(w.proxy); err != nil { + errors = append(errors, err) + } + } + if len(errors) > 0 { + return newError("failed to close all resources").Base(newError(serial.Concat(errors...))) } return nil @@ -227,7 +237,7 @@ type udpWorker struct { uplinkCounter core.StatCounter downlinkCounter core.StatCounter - done *done.Instance + checker *task.Periodic activeConn map[connID]*udpConn } @@ -295,7 +305,7 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest if err := w.proxy.Process(ctx, net.Network_UDP, conn, w.dispatcher); err != nil { newError("connection ends").Base(err).WriteToLog() } - conn.Close() + conn.Close() // nolint: errcheck w.removeConn(id) }() } @@ -309,12 +319,37 @@ func (w *udpWorker) removeConn(id connID) { func (w *udpWorker) Start() error { w.activeConn = make(map[connID]*udpConn, 16) - w.done = done.New() h, err := udp.ListenUDP(w.address, w.port, w.callback, udp.HubReceiveOriginalDestination(w.recvOrigDest), udp.HubCapacity(256)) if err != nil { return err } - go w.monitor() + w.checker = &task.Periodic{ + Interval: time.Second * 16, + Execute: func() error { + nowSec := time.Now().Unix() + w.Lock() + if len(w.activeConn) == 0 { + return nil + } + + for addr, conn := range w.activeConn { + if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 8 { + delete(w.activeConn, addr) + conn.Close() // nolint: errcheck + } + } + + if len(w.activeConn) == 0 { + w.activeConn = make(map[connID]*udpConn, 16) + } + w.Unlock() + + return nil + }, + } + if err := w.checker.Start(); err != nil { + return err + } w.hub = h return nil } @@ -323,38 +358,28 @@ func (w *udpWorker) Close() error { w.Lock() defer w.Unlock() + var errors []interface{} + if w.hub != nil { - w.hub.Close() - } - - if w.done != nil { - common.Must(w.done.Close()) - } - - common.Close(w.proxy) - return nil -} - -func (w *udpWorker) monitor() { - timer := time.NewTicker(time.Second * 16) - defer timer.Stop() - - for { - select { - case <-w.done.Wait(): - return - case <-timer.C: - nowSec := time.Now().Unix() - w.Lock() - for addr, conn := range w.activeConn { - if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 8 { - delete(w.activeConn, addr) - conn.Close() - } - } - w.Unlock() + if err := w.hub.Close(); err != nil { + errors = append(errors, err) } } + + if w.checker != nil { + if err := w.checker.Close(); err != nil { + errors = append(errors, err) + } + } + + if err := common.Close(w.proxy); err != nil { + errors = append(errors, err) + } + + if len(errors) > 0 { + return newError("failed to close all resources").Base(newError(serial.Concat(errors...))) + } + return nil } func (w *udpWorker) Port() net.Port { diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 0543bbf8f..b1e989465 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -327,10 +327,12 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil } +// Start implements common.Runnable. func (s *Server) Start() error { return nil } +// Close implements common.Closable. func (s *Server) Close() error { return nil } @@ -463,7 +465,7 @@ func (w *ServerWorker) run(ctx context.Context) { input := w.link.Reader reader := &buf.BufferedReader{Reader: input} - defer w.sessionManager.Close() + defer w.sessionManager.Close() // nolint: errcheck for { select { diff --git a/common/buf/writer.go b/common/buf/writer.go index 73d0538cd..1a4192e84 100644 --- a/common/buf/writer.go +++ b/common/buf/writer.go @@ -12,13 +12,6 @@ type BufferToBytesWriter struct { io.Writer } -// NewBufferToBytesWriter returns a new BufferToBytesWriter. -func NewBufferToBytesWriter(writer io.Writer) *BufferToBytesWriter { - return &BufferToBytesWriter{ - Writer: writer, - } -} - // WriteMultiBuffer implements Writer. This method takes ownership of the given buffer. func (w *BufferToBytesWriter) WriteMultiBuffer(mb MultiBuffer) error { defer mb.Release() @@ -113,7 +106,16 @@ func (w *BufferedWriter) WriteMultiBuffer(b MultiBuffer) error { // Flush flushes buffered content into underlying writer. func (w *BufferedWriter) Flush() error { if !w.buffer.IsEmpty() { - if err := w.writer.WriteMultiBuffer(NewMultiBufferValue(w.buffer)); err != nil { + b := w.buffer + w.buffer = nil + + if writer, ok := w.writer.(io.Writer); ok { + _, err := writer.Write(b.Bytes()) + b.Release() + if err != nil { + return err + } + } else if err := w.writer.WriteMultiBuffer(NewMultiBufferValue(b)); err != nil { return err } diff --git a/dns.go b/dns.go index 5546f1156..ed618ef7d 100644 --- a/dns.go +++ b/dns.go @@ -55,6 +55,6 @@ func (d *syncDNSClient) Set(client DNSClient) { d.Lock() defer d.Unlock() - common.Close(d.DNSClient) + common.Close(d.DNSClient) // nolint: errcheck d.DNSClient = client } diff --git a/functions.go b/functions.go new file mode 100644 index 000000000..feeab026d --- /dev/null +++ b/functions.go @@ -0,0 +1,16 @@ +package core + +import ( + "context" + + "v2ray.com/core/common" +) + +// CreateObject creates a new object based on the given V2Ray instance and config. The V2Ray instance may be nil. +func CreateObject(v *Instance, config interface{}) (interface{}, error) { + ctx := context.Background() + if v != nil { + ctx = context.WithValue(ctx, v2rayKey, v) + } + return common.CreateObject(ctx, config) +} diff --git a/network.go b/network.go index a164e54ac..2f8513672 100644 --- a/network.go +++ b/network.go @@ -90,7 +90,7 @@ func (m *syncInboundHandlerManager) Set(manager InboundHandlerManager) { m.Lock() defer m.Unlock() - common.Close(m.InboundHandlerManager) + common.Close(m.InboundHandlerManager) // nolint: errcheck m.InboundHandlerManager = manager } @@ -172,6 +172,6 @@ func (m *syncOutboundHandlerManager) Set(manager OutboundHandlerManager) { m.Lock() defer m.Unlock() - common.Close(m.OutboundHandlerManager) + common.Close(m.OutboundHandlerManager) // nolint: errcheck m.OutboundHandlerManager = manager } diff --git a/policy.go b/policy.go index 639752410..177d9fdf4 100644 --- a/policy.go +++ b/policy.go @@ -35,6 +35,7 @@ type BufferPolicy struct { PerConnection int32 } +// SystemStatsPolicy contains stat policy settings on system level. type SystemStatsPolicy struct { // Whether or not to enable stat counter for uplink traffic in inbound handlers. InboundUplink bool @@ -42,6 +43,7 @@ type SystemStatsPolicy struct { InboundDownlink bool } +// SystemPolicy contains policy settings at system level. type SystemPolicy struct { Stats SystemStatsPolicy Buffer BufferPolicy @@ -178,6 +180,6 @@ func (m *syncPolicyManager) Set(manager PolicyManager) { m.Lock() defer m.Unlock() - common.Close(m.PolicyManager) + common.Close(m.PolicyManager) // nolint: errcheck m.PolicyManager = manager } diff --git a/proxy/http/server.go b/proxy/http/server.go index 21b5c1413..f09ae875f 100755 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -10,10 +10,6 @@ import ( "strings" "time" - "v2ray.com/core/common/task" - - "v2ray.com/core/transport/pipe" - "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" @@ -22,7 +18,9 @@ import ( "v2ray.com/core/common/net" http_proto "v2ray.com/core/common/protocol/http" "v2ray.com/core/common/signal" + "v2ray.com/core/common/task" "v2ray.com/core/transport/internet" + "v2ray.com/core/transport/pipe" ) // Server is an HTTP proxy server. @@ -114,7 +112,7 @@ Start: if err != nil { trace := newError("failed to read http request").Base(err) if errors.Cause(err) != io.EOF && !isTimeout(errors.Cause(err)) { - trace.AtWarning() + trace.AtWarning() // nolint: errcheck } return trace } @@ -258,7 +256,7 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri } // Plain HTTP request is not a stream. The request always finishes before response. Hense request has to be closed later. - defer common.Close(link.Writer) + defer common.Close(link.Writer) // nolint: errcheck var result error = errWaitAnother requestDone := func() error { diff --git a/router.go b/router.go index 7913deea7..692a8a370 100644 --- a/router.go +++ b/router.go @@ -67,7 +67,7 @@ func (d *syncDispatcher) Set(disp Dispatcher) { d.Lock() defer d.Unlock() - common.Close(d.Dispatcher) + common.Close(d.Dispatcher) // nolint: errorcheck d.Dispatcher = disp } @@ -126,6 +126,6 @@ func (r *syncRouter) Set(router Router) { r.Lock() defer r.Unlock() - common.Close(r.Router) + common.Close(r.Router) // nolint: errcheck r.Router = router } diff --git a/stats.go b/stats.go index 7ec0c8146..2c7d50828 100644 --- a/stats.go +++ b/stats.go @@ -81,7 +81,7 @@ func (s *syncStatManager) Set(m StatManager) { defer s.Unlock() if s.StatManager != nil { - s.StatManager.Close() + s.StatManager.Close() // nolint: errcheck } s.StatManager = m } diff --git a/transport/internet/websocket/connection.go b/transport/internet/websocket/connection.go index f9e334fc9..6f55a8c16 100644 --- a/transport/internet/websocket/connection.go +++ b/transport/internet/websocket/connection.go @@ -9,6 +9,7 @@ import ( "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" + "v2ray.com/core/common/serial" ) var ( @@ -70,7 +71,7 @@ func (c *connection) Write(b []byte) (int, error) { func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error { if c.mergingWriter == nil { - c.mergingWriter = buf.NewBufferedWriter(buf.NewBufferToBytesWriter(c)) + c.mergingWriter = buf.NewBufferedWriter(&buf.BufferToBytesWriter{Writer: c}) } if err := c.mergingWriter.WriteMultiBuffer(mb); err != nil { return err @@ -79,8 +80,17 @@ func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error { } func (c *connection) Close() error { - c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second*5)) - return c.conn.Close() + var errors []interface{} + if err := c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second*5)); err != nil { + errors = append(errors, err) + } + if err := c.conn.Close(); err != nil { + errors = append(errors, err) + } + if len(errors) > 0 { + return newError("failed to close connection").Base(newError(serial.Concat(errors...))) + } + return nil } func (c *connection) LocalAddr() net.Addr { diff --git a/transport/internet/websocket/hub.go b/transport/internet/websocket/hub.go index 5a2591fd4..57c714467 100644 --- a/transport/internet/websocket/hub.go +++ b/transport/internet/websocket/hub.go @@ -7,7 +7,6 @@ import ( "strconv" "sync" "time" - "websocket" "v2ray.com/core/common" diff --git a/v2ray.go b/v2ray.go index 72c32628c..c6214f2d7 100755 --- a/v2ray.go +++ b/v2ray.go @@ -5,6 +5,7 @@ import ( "sync" "v2ray.com/core/common" + "v2ray.com/core/common/serial" "v2ray.com/core/common/uuid" ) @@ -53,13 +54,13 @@ func New(config *Config) (*Instance, error) { if err != nil { return nil, err } - if _, err := server.CreateObject(settings); err != nil { + if _, err := CreateObject(server, settings); err != nil { return nil, err } } for _, inbound := range config.Inbound { - rawHandler, err := server.CreateObject(inbound) + rawHandler, err := CreateObject(server, inbound) if err != nil { return nil, err } @@ -73,7 +74,7 @@ func New(config *Config) (*Instance, error) { } for _, outbound := range config.Outbound { - rawHandler, err := server.CreateObject(outbound) + rawHandler, err := CreateObject(server, outbound) if err != nil { return nil, err } @@ -89,11 +90,6 @@ func New(config *Config) (*Instance, error) { return server, nil } -func (s *Instance) CreateObject(config interface{}) (interface{}, error) { - ctx := context.WithValue(context.Background(), v2rayKey, s) - return common.CreateObject(ctx, config) -} - // ID returns a unique ID for this V2Ray instance. func (s *Instance) ID() uuid.UUID { return s.id @@ -105,8 +101,15 @@ func (s *Instance) Close() error { defer s.access.Unlock() s.running = false + + var errors []interface{} for _, f := range s.allFeatures() { - f.Close() + if err := f.Close(); err != nil { + errors = append(errors, err) + } + } + if len(errors) > 0 { + return newError("failed to close all features").Base(newError(serial.Concat(errors...))) } return nil