From 1f8fcb558db67f035b29a9d452b7e6bdcae8d480 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Mon, 19 Feb 2018 21:38:04 +0100 Subject: [PATCH] fix error handling in freedom, shadowsocks and socks --- proxy/freedom/freedom.go | 8 +++++--- proxy/shadowsocks/client.go | 2 -- proxy/socks/client.go | 30 +++++++++++++++++++++++++----- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index ef87340f4..80292fb60 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -114,6 +114,8 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial timer := signal.CancelAfterInactivity(ctx, cancel, h.policy().Timeouts.ConnectionIdle) requestDone := signal.ExecuteAsync(func() error { + defer timer.SetTimeout(h.policy().Timeouts.DownlinkOnly) + var writer buf.Writer if destination.Network == net.Network_TCP { writer = buf.NewWriter(conn) @@ -123,18 +125,18 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to process request").Base(err) } - timer.SetTimeout(h.policy().Timeouts.DownlinkOnly) + return nil }) responseDone := signal.ExecuteAsync(func() error { - defer output.Close() + defer timer.SetTimeout(h.policy().Timeouts.UplinkOnly) v2reader := buf.NewReader(conn) if err := buf.Copy(v2reader, output, buf.UpdateActivity(timer)); err != nil { return newError("failed to process response").Base(err) } - timer.SetTimeout(h.policy().Timeouts.UplinkOnly) + return nil }) diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index d622ba065..58e659db3 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -115,7 +115,6 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale }) responseDone := signal.ExecuteAsync(func() error { - defer outboundRay.OutboundOutput().Close() defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) responseReader, err := ReadTCPResponse(user, conn) @@ -150,7 +149,6 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale }) responseDone := signal.ExecuteAsync(func() error { - defer outboundRay.OutboundOutput().Close() defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) reader := &UDPReader{ diff --git a/proxy/socks/client.go b/proxy/socks/client.go index ea97b8362..78191e1e6 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -4,6 +4,7 @@ import ( "context" "time" + "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" @@ -17,7 +18,8 @@ import ( // Client is a Socks5 client. type Client struct { - serverPicker protocol.ServerPicker + serverPicker protocol.ServerPicker + policyManager core.PolicyManager } // NewClient create a new Socks5 client based on the given config. @@ -30,8 +32,14 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { return nil, newError("0 target server") } + v := core.FromContext(ctx) + if v == nil { + return nil, newError("V is not in context") + } + return &Client{ - serverPicker: protocol.NewRoundRobinServerPicker(serverList), + serverPicker: protocol.NewRoundRobinServerPicker(serverList), + policyManager: v.PolicyManager(), }, nil } @@ -63,6 +71,8 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. defer conn.Close() + p := c.policyManager.ForLevel(0) + request := &protocol.RequestHeader{ Version: socks5Version, Command: protocol.RequestCommandTCP, @@ -76,24 +86,33 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. user := server.PickUser() if user != nil { request.User = user + p = c.policyManager.ForLevel(user.Level) } + if err := conn.SetDeadline(time.Now().Add(p.Timeouts.Handshake)); err != nil { + newError("failed to set deadline for handshake").Base(err).WriteToLog() + } udpRequest, err := ClientHandshake(request, conn, conn) if err != nil { return newError("failed to establish connection to server").AtWarning().Base(err) } + if err := conn.SetDeadline(time.Time{}); err != nil { + newError("failed to clear deadline after handshake").Base(err).WriteToLog() + } + ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*5) + timer := signal.CancelAfterInactivity(ctx, cancel, p.Timeouts.ConnectionIdle) var requestFunc func() error var responseFunc func() error if request.Command == protocol.RequestCommandTCP { requestFunc = func() error { + defer timer.SetTimeout(p.Timeouts.DownlinkOnly) return buf.Copy(ray.OutboundInput(), buf.NewWriter(conn), buf.UpdateActivity(timer)) } responseFunc = func() error { - defer ray.OutboundOutput().Close() + defer timer.SetTimeout(p.Timeouts.UplinkOnly) return buf.Copy(buf.NewReader(conn), ray.OutboundOutput(), buf.UpdateActivity(timer)) } } else if request.Command == protocol.RequestCommandUDP { @@ -103,10 +122,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. } defer udpConn.Close() requestFunc = func() error { + defer timer.SetTimeout(p.Timeouts.DownlinkOnly) return buf.Copy(ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer)) } responseFunc = func() error { - defer ray.OutboundOutput().Close() + defer timer.SetTimeout(p.Timeouts.UplinkOnly) reader := &UDPReader{reader: udpConn} return buf.Copy(reader, ray.OutboundOutput(), buf.UpdateActivity(timer)) }