From c09ca41161f5ac2b41af7f4bb98e373d3c791e2a Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Sat, 28 Jan 2017 22:31:21 +0100 Subject: [PATCH] handle close wait --- proxy/dokodemo/dokodemo.go | 2 +- proxy/freedom/freedom.go | 2 +- proxy/http/server.go | 2 +- proxy/shadowsocks/client.go | 6 ++---- proxy/shadowsocks/server.go | 2 +- proxy/socks/client.go | 4 ++-- proxy/socks/server.go | 6 +++--- proxy/vmess/inbound/inbound.go | 2 +- proxy/vmess/outbound/outbound.go | 2 +- 9 files changed, 13 insertions(+), 15 deletions(-) diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index c8bd833d2..6ebddc71f 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -65,7 +65,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in timedReader := net.NewTimeOutReader(d.config.Timeout, conn) chunkReader := buf.NewReader(timedReader) - if err := buf.PipeUntilEOF(chunkReader, inboundRay.InboundInput()); err != nil { + if err := buf.Pipe(chunkReader, inboundRay.InboundInput()); err != nil { log.Info("Dokodemo: Failed to transport request: ", err) return err } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 4bee4027d..9edc2da4c 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -130,7 +130,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) erro defer output.Close() v2reader := buf.NewReader(reader) - if err := buf.PipeUntilEOF(v2reader, output); err != nil { + if err := buf.Pipe(v2reader, output); err != nil { return err } return nil diff --git a/proxy/http/server.go b/proxy/http/server.go index 43ca95373..dccb4e38d 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -136,7 +136,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade defer ray.InboundInput().Close() v2reader := buf.NewReader(reader) - if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { + if err := buf.Pipe(v2reader, ray.InboundInput()); err != nil { return err } return nil diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 925158ade..2d29a7199 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -113,7 +113,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error return err } - if err := buf.PipeUntilEOF(responseReader, outboundRay.OutboundOutput()); err != nil { + if err := buf.Pipe(responseReader, outboundRay.OutboundOutput()); err != nil { return err } @@ -122,8 +122,6 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil { log.Info("Shadowsocks|Client: Connection ends with ", err) - outboundRay.OutboundInput().CloseError() - outboundRay.OutboundOutput().CloseError() return err } @@ -155,7 +153,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error User: user, } - if err := buf.PipeUntilEOF(reader, outboundRay.OutboundOutput()); err != nil { + if err := buf.Pipe(reader, outboundRay.OutboundOutput()); err != nil { log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err) return err } diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 3e3d26fbf..1b0a68cab 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -177,7 +177,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) return err } - if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { + if err := buf.Pipe(ray.InboundOutput(), responseWriter); err != nil { log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) return err } diff --git a/proxy/socks/client.go b/proxy/socks/client.go index 447d1fb22..05a09816f 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -87,7 +87,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error { } responseFunc = func() error { defer ray.OutboundOutput().Close() - return buf.PipeUntilEOF(buf.NewReader(conn), ray.OutboundOutput()) + return buf.Pipe(buf.NewReader(conn), ray.OutboundOutput()) } } else if request.Command == protocol.RequestCommandUDP { udpConn, err := dialer.Dial(ctx, udpRequest.Destination()) @@ -102,7 +102,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error { responseFunc = func() error { defer ray.OutboundOutput().Close() reader := &UDPReader{reader: net.NewTimeOutReader(16, udpConn)} - return buf.PipeUntilEOF(reader, ray.OutboundOutput()) + return buf.Pipe(reader, ray.OutboundOutput()) } } diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 4955f7172..fe26a1206 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -58,6 +58,8 @@ func (s *Server) Network() net.NetworkList { } func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error { + conn.SetReusable(false) + switch network { case net.Network_TCP: return s.processTCP(ctx, conn) @@ -69,8 +71,6 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet } func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error { - conn.SetReusable(false) - timedReader := net.NewTimeOutReader(16 /* seconds, for handshake */, conn) reader := bufio.NewReader(timedReader) @@ -123,7 +123,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ defer input.Close() v2reader := buf.NewReader(reader) - if err := buf.PipeUntilEOF(v2reader, input); err != nil { + if err := buf.Pipe(v2reader, input); err != nil { log.Info("Socks|Server: Failed to transport all TCP request: ", err) return err } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 8bd27b8ef..c57c1eeb6 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -133,7 +133,7 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH defer output.Close() bodyReader := session.DecodeRequestBody(request, input) - if err := buf.PipeUntilEOF(bodyReader, output); err != nil { + if err := buf.Pipe(bodyReader, output); err != nil { return err } return nil diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index c92883bda..7cfe44290 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -145,7 +145,7 @@ func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.Outb reader.SetBuffered(false) bodyReader := session.DecodeResponseBody(request, reader) - if err := buf.PipeUntilEOF(bodyReader, output); err != nil { + if err := buf.Pipe(bodyReader, output); err != nil { return err }