From 5e7fb6d0ddb0cb26336492793576e4dd43c228ef Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Fri, 10 Feb 2017 11:41:50 +0100 Subject: [PATCH] refine error handling in retry logic --- app/proxyman/outbound/handler.go | 11 +++++++---- common/retry/retry.go | 10 ++++++++-- common/retry/retry_test.go | 4 ++-- proxy/blackhole/blackhole.go | 9 +++------ proxy/freedom/freedom.go | 3 +-- proxy/shadowsocks/client.go | 5 ++--- proxy/socks/client.go | 5 ++--- proxy/vmess/outbound/outbound.go | 5 ++--- transport/internet/tcp_hub.go | 8 ++------ transport/internet/websocket/dialer.go | 8 ++++---- 10 files changed, 33 insertions(+), 35 deletions(-) diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index c51a9d270..63c1c6f5e 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -66,10 +66,13 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) { err := h.proxy.Process(ctx, outboundRay, h) // Ensure outbound ray is properly closed. - if err != nil && errors.Cause(err) != io.EOF { - outboundRay.OutboundOutput().CloseError() - } else { - outboundRay.OutboundOutput().Close() + if err != nil { + log.Warning("Proxyman|OutboundHandler: Failed to process outbound traffic.") + if errors.Cause(err) != io.EOF { + outboundRay.OutboundOutput().CloseError() + } else { + outboundRay.OutboundOutput().Close() + } } outboundRay.OutboundInput().CloseError() } diff --git a/common/retry/retry.go b/common/retry/retry.go index 55288a421..3c8212f21 100644 --- a/common/retry/retry.go +++ b/common/retry/retry.go @@ -2,11 +2,12 @@ package retry import ( "time" + "v2ray.com/core/common/errors" ) var ( - ErrRetryFailed = errors.New("All retry attempts failed.") + ErrRetryFailed = errors.New("Retry: All retry attempts failed.") ) // Strategy is a way to retry on a specific function. @@ -23,16 +24,21 @@ type retryer struct { // On implements Strategy.On. func (r *retryer) On(method func() error) error { attempt := 0 + accumulatedError := make([]error, 0, r.totalAttempt) for attempt < r.totalAttempt { err := method() if err == nil { return nil } + numErrors := len(accumulatedError) + if numErrors == 0 || err.Error() != accumulatedError[numErrors-1].Error() { + accumulatedError = append(accumulatedError, err) + } delay := r.nextDelay() <-time.After(time.Duration(delay) * time.Millisecond) attempt++ } - return ErrRetryFailed + return errors.Base(ErrRetryFailed).Message(accumulatedError) } // Timed returns a retry strategy with fixed interval. diff --git a/common/retry/retry_test.go b/common/retry/retry_test.go index 8448e35f3..23dd249ee 100644 --- a/common/retry/retry_test.go +++ b/common/retry/retry_test.go @@ -73,7 +73,7 @@ func TestRetryExhausted(t *testing.T) { }) duration := time.Since(startTime) - assert.Error(err).Equals(ErrRetryFailed) + assert.Error(errors.Cause(err)).Equals(ErrRetryFailed) assert.Int64(int64(duration / time.Millisecond)).AtLeast(1900) } @@ -88,6 +88,6 @@ func TestExponentialBackoff(t *testing.T) { }) duration := time.Since(startTime) - assert.Error(err).Equals(ErrRetryFailed) + assert.Error(errors.Cause(err)).Equals(ErrRetryFailed) assert.Int64(int64(duration / time.Millisecond)).AtLeast(4000) } diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index 17116d3fe..18b920b22 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -3,7 +3,6 @@ package blackhole import ( "context" - "errors" "time" "v2ray.com/core/common" @@ -11,10 +10,6 @@ import ( "v2ray.com/core/transport/ray" ) -var ( - errConnectionBlocked = errors.New("Blackhole: connection blocked.") -) - // Handler is an outbound connection that sliently swallow the entire payload. type Handler struct { response ResponseConfig @@ -36,7 +31,9 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial v.response.WriteTo(outboundRay.OutboundOutput()) // Sleep a little here to make sure the response is sent to client. time.Sleep(time.Second) - return errConnectionBlocked + outboundRay.OutboundOutput().Close() + outboundRay.OutboundInput().CloseError() + return nil } func init() { diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 47c565dbb..9af1fd7c5 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -102,8 +102,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial return nil }) if err != nil { - log.Warning("Freedom: Failed to open connection to ", destination, ": ", err) - return err + return errors.Base(err).Message("Freedom: Failed to open connection to ", destination) } defer conn.Close() diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index d01948c02..37befc6f2 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -2,7 +2,6 @@ package shadowsocks import ( "context" - "errors" "runtime" "time" @@ -10,6 +9,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" + "v2ray.com/core/common/errors" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" @@ -61,8 +61,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale return nil }) if err != nil { - log.Warning("Shadowsocks|Client: Failed to find an available destination:", err) - return err + return errors.Base(err).Message("Shadowsocks|Client: Failed to find an available destination.") } log.Info("Shadowsocks|Client: Tunneling request to ", destination, " via ", server.Destination()) diff --git a/proxy/socks/client.go b/proxy/socks/client.go index f61f93d40..87ec76e38 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -2,13 +2,13 @@ package socks import ( "context" - "errors" "runtime" "time" "v2ray.com/core/app/log" "v2ray.com/core/common" "v2ray.com/core/common/buf" + "v2ray.com/core/common/errors" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" @@ -56,8 +56,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. }) if err != nil { - log.Warning("Socks|Client: Failed to find an available destination.") - return err + return errors.Base(err).Message("Socks|Client: Failed to find an available destination.") } defer conn.Close() diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 708c3dd8c..c0def0b7c 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -51,7 +51,7 @@ func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.Outb var rec *protocol.ServerSpec var conn internet.Connection - err := retry.ExponentialBackoff(5, 100).On(func() error { + err := retry.ExponentialBackoff(5, 200).On(func() error { rec = v.serverPicker.PickServer() rawConn, err := dialer.Dial(ctx, rec.Destination()) if err != nil { @@ -62,8 +62,7 @@ func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.Outb return nil }) if err != nil { - log.Warning("VMess|Outbound: Failed to find an available destination:", err) - return err + return errors.Base(err).Message("VMess|Outbound: Failed to find an available destination.") } defer conn.Close() diff --git a/transport/internet/tcp_hub.go b/transport/internet/tcp_hub.go index c9298350c..7a8713e90 100644 --- a/transport/internet/tcp_hub.go +++ b/transport/internet/tcp_hub.go @@ -5,7 +5,6 @@ import ( "sync" "v2ray.com/core/common/errors" - "v2ray.com/core/app/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/retry" ) @@ -73,16 +72,13 @@ func (v *TCPHub) start() { v.accepting = true for v.accepting { var newConn Connection - err := retry.ExponentialBackoff(10, 200).On(func() error { + err := retry.ExponentialBackoff(10, 500).On(func() error { if !v.accepting { return nil } conn, err := v.listener.Accept() if err != nil { - if v.accepting { - log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err) - } - return err + return errors.Base(err).Message("Internet|Listener: Failed to accept new TCP connection.") } newConn = conn return nil diff --git a/transport/internet/websocket/dialer.go b/transport/internet/websocket/dialer.go index 34581475e..b14ad866f 100644 --- a/transport/internet/websocket/dialer.go +++ b/transport/internet/websocket/dialer.go @@ -2,12 +2,12 @@ package websocket import ( "context" - "io/ioutil" "net" "github.com/gorilla/websocket" "v2ray.com/core/app/log" "v2ray.com/core/common" + "v2ray.com/core/common/errors" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/internal" @@ -78,11 +78,11 @@ func wsDial(ctx context.Context, dest v2net.Destination) (net.Conn, error) { conn, resp, err := dialer.Dial(uri, nil) if err != nil { + var reason string if resp != nil { - reason, reasonerr := ioutil.ReadAll(resp.Body) - log.Info(string(reason), reasonerr) + reason = resp.Status } - return nil, err + return nil, errors.Base(err).Format("WebSocket|Dialer: Failed to dial to (", uri, "): ", reason) } return &wsconn{