From 72992c74785480eff95ed943c1739d37f39105de Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Tue, 10 Jan 2017 14:22:42 +0100 Subject: [PATCH] able to close ray stream with error --- app/dispatcher/impl/default.go | 2 +- app/proxy/proxy.go | 2 +- proxy/blackhole/blackhole.go | 2 +- proxy/dokodemo/dokodemo.go | 8 +++---- proxy/freedom/freedom.go | 6 ++--- proxy/http/server.go | 15 ++++++------ proxy/shadowsocks/client.go | 12 ++++++---- proxy/shadowsocks/server.go | 6 ++--- proxy/socks/client.go | 4 ++-- proxy/socks/server.go | 4 ++-- proxy/vmess/inbound/inbound.go | 5 ++-- proxy/vmess/outbound/outbound.go | 7 ++---- transport/internet/udp/udp_server.go | 2 +- transport/ray/direct.go | 34 ++++++++++++++-------------- transport/ray/ray.go | 9 ++++++-- 15 files changed, 57 insertions(+), 61 deletions(-) diff --git a/app/dispatcher/impl/default.go b/app/dispatcher/impl/default.go index 6c6548c52..0f00f651d 100644 --- a/app/dispatcher/impl/default.go +++ b/app/dispatcher/impl/default.go @@ -71,7 +71,7 @@ func (v *DefaultDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.I func (v *DefaultDispatcher) waitAndDispatch(wait func() error, destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) { if err := wait(); err != nil { log.Info("DefaultDispatcher: Failed precondition: ", err) - link.OutboundInput().ForceClose() + link.OutboundInput().CloseError() link.OutboundOutput().Close() return } diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go index 1a610cefc..981a46d44 100644 --- a/app/proxy/proxy.go +++ b/app/proxy/proxy.go @@ -98,7 +98,7 @@ func (v *Connection) Write(b []byte) (int, error) { func (v *Connection) Close() error { v.closed = true v.stream.InboundInput().Close() - v.stream.InboundOutput().ForceClose() + v.stream.InboundOutput().CloseError() return nil } diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index cc4e6617d..432e4337d 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -31,7 +31,7 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { v.response.WriteTo(ray.OutboundOutput()) ray.OutboundOutput().Close() - ray.OutboundInput().ForceClose() + ray.OutboundInput().CloseError() } // Factory is an utility for creating blackhole handlers. diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index dd1dd82d7..4f92cda5c 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -172,8 +172,6 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { Destination: dest, Inbound: v.meta, }) - output := ray.InboundOutput() - defer output.ForceClose() reader := v2net.NewTimeOutReader(v.config.Timeout, conn) @@ -191,11 +189,9 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { }) responseDone := signal.ExecuteAsync(func() error { - defer output.ForceClose() - v2writer := buf.NewWriter(conn) - if err := buf.PipeUntilEOF(output, v2writer); err != nil { + if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { log.Info("Dokodemo: Failed to transport all TCP response: ", err) return err } @@ -203,6 +199,8 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { }) if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + ray.InboundInput().CloseError() + ray.InboundOutput().CloseError() log.Info("Dokodemo: Connection ends with ", err) } } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 4bb441355..3fd191617 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -72,8 +72,6 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { input := ray.OutboundInput() output := ray.OutboundOutput() - defer input.ForceClose() - defer output.Close() var conn internet.Connection if v.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() { @@ -96,8 +94,6 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { conn.SetReusable(false) requestDone := signal.ExecuteAsync(func() error { - defer input.ForceClose() - v2writer := buf.NewWriter(conn) if err := buf.PipeUntilEOF(input, v2writer); err != nil { return err @@ -127,6 +123,8 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("Freedom: Connection ending with ", err) + input.CloseError() + output.CloseError() } } diff --git a/proxy/http/server.go b/proxy/http/server.go index 713dbe2a5..df22a2e82 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -176,8 +176,6 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo }) responseDone := signal.ExecuteAsync(func() error { - defer ray.InboundOutput().ForceClose() - v2writer := buf.NewWriter(writer) if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { return err @@ -185,7 +183,11 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo return nil }) - signal.ErrorOrFinish2(requestDone, responseDone) + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("HTTP|Server: Connection ends with: ", err) + ray.InboundInput().CloseError() + ray.InboundOutput().CloseError() + } } // @VisibleForTesting @@ -244,9 +246,6 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn input := ray.InboundInput() output := ray.InboundOutput() - defer input.Close() - defer output.ForceClose() - requestDone := signal.ExecuteAsync(func() error { defer input.Close() @@ -262,8 +261,6 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn }) responseDone := signal.ExecuteAsync(func() error { - defer output.ForceClose() - responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput())) response, err := http.ReadResponse(responseReader, request) if err != nil { @@ -283,6 +280,8 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("HTTP|Server: Connecton ending with ", err) + input.CloseError() + output.CloseError() } } diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 81d9513c8..18ea62b3b 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -96,8 +96,6 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { bufferedWriter.SetBuffered(false) requestDone := signal.ExecuteAsync(func() error { - defer ray.OutboundInput().ForceClose() - if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { return err } @@ -121,6 +119,8 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("Shadowsocks|Client: Connection ends with ", err) + ray.OutboundInput().CloseError() + ray.OutboundOutput().CloseError() } } @@ -132,8 +132,6 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { } requestDone := signal.ExecuteAsync(func() error { - defer ray.OutboundInput().ForceClose() - if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) return err @@ -158,7 +156,11 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) { return nil }) - signal.ErrorOrFinish2(requestDone, responseDone) + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("Shadowsocks|Client: Connection ends with ", err) + ray.OutboundInput().CloseError() + ray.OutboundOutput().CloseError() + } } } diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 7b3159f4e..ec7fa8ea3 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -169,12 +169,8 @@ func (v *Server) handleConnection(conn internet.Connection) { User: request.User, Inbound: v.meta, }) - defer ray.InboundOutput().ForceClose() - defer ray.InboundInput().Close() requestDone := signal.ExecuteAsync(func() error { - defer ray.InboundOutput().ForceClose() - bufferedWriter := bufio.NewWriter(conn) responseWriter, err := WriteTCPResponse(request, bufferedWriter) if err != nil { @@ -215,6 +211,8 @@ func (v *Server) handleConnection(conn internet.Connection) { if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("Shadowsocks|Server: Connection ends with ", err) + ray.InboundInput().CloseError() + ray.InboundOutput().CloseError() } } diff --git a/proxy/socks/client.go b/proxy/socks/client.go index 73b39276e..cf3207484 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -80,7 +80,6 @@ func (c *Client) Dispatch(destination net.Destination, ray ray.OutboundRay) { var responseFunc func() error if request.Command == protocol.RequestCommandTCP { requestFunc = func() error { - defer ray.OutboundInput().ForceClose() return buf.PipeUntilEOF(ray.OutboundInput(), buf.NewWriter(conn)) } responseFunc = func() error { @@ -95,7 +94,6 @@ func (c *Client) Dispatch(destination net.Destination, ray ray.OutboundRay) { } defer udpConn.Close() requestFunc = func() error { - defer ray.OutboundInput().ForceClose() return buf.PipeUntilEOF(ray.OutboundInput(), &UDPWriter{request: request, writer: udpConn}) } responseFunc = func() error { @@ -109,6 +107,8 @@ func (c *Client) Dispatch(destination net.Destination, ray ray.OutboundRay) { responseDone := signal.ExecuteAsync(responseFunc) if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("Socks|Client: Connection ends with ", err) + ray.OutboundInput().CloseError() + ray.OutboundOutput().CloseError() } } diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 56226defa..638e3c6d4 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -165,8 +165,6 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se }) responseDone := signal.ExecuteAsync(func() error { - defer output.ForceClose() - v2writer := buf.NewWriter(writer) if err := buf.PipeUntilEOF(output, v2writer); err != nil { log.Info("Socks|Server: Failed to transport all TCP response: ", err) @@ -178,6 +176,8 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("Socks|Server: Connection ends with ", err) + input.CloseError() + output.CloseError() } } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index a30a6f882..d2ab9ee8f 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -141,7 +141,6 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH } func transferResponse(session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error { - defer input.ForceClose() session.EncodeResponseHeader(response, output) bodyWriter := session.EncodeResponseBody(request, output) @@ -215,8 +214,6 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { }) input := ray.InboundInput() output := ray.InboundOutput() - defer input.Close() - defer output.ForceClose() userSettings := request.User.GetSettings() connReader.SetTimeOut(userSettings.PayloadReadTimeout) @@ -242,6 +239,8 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("VMess|Inbound: Connection ending with ", err) connection.SetReusable(false) + input.CloseError() + output.CloseError() return } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index f645c91a0..b23d19a3a 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -30,9 +30,6 @@ type VMessOutboundHandler struct { // Dispatch implements OutboundHandler.Dispatch(). func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, outboundRay ray.OutboundRay) { - defer outboundRay.OutboundInput().ForceClose() - defer outboundRay.OutboundOutput().Close() - var rec *protocol.ServerSpec var conn internet.Connection @@ -85,8 +82,6 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, outboundRay ra session := encoding.NewClientSession(protocol.DefaultIDHash) requestDone := signal.ExecuteAsync(func() error { - defer input.ForceClose() - writer := bufio.NewWriter(conn) session.EncodeRequestHeader(request, writer) @@ -140,6 +135,8 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, outboundRay ra if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { log.Info("VMess|Outbound: Connection ending with ", err) conn.SetReusable(false) + input.CloseError() + output.CloseError() } return diff --git a/transport/internet/udp/udp_server.go b/transport/internet/udp/udp_server.go index ed5f3f4ba..228ba8df8 100644 --- a/transport/internet/udp/udp_server.go +++ b/transport/internet/udp/udp_server.go @@ -88,7 +88,7 @@ func (v *TimedInboundRay) Release() { } v.server = nil v.inboundRay.InboundInput().Close() - v.inboundRay.InboundOutput().ForceClose() + v.inboundRay.InboundOutput().CloseError() v.inboundRay = nil } diff --git a/transport/ray/direct.go b/transport/ray/direct.go index deb05f0d0..bb838f4ef 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -54,23 +54,23 @@ func (v *directRay) AddInspector(inspector Inspector) { type Stream struct { buffer chan *buf.Buffer - srcClose chan bool - destClose chan bool + close chan bool + err chan bool inspector *InspectorChain } func NewStream() *Stream { return &Stream{ buffer: make(chan *buf.Buffer, bufferSize), - srcClose: make(chan bool), - destClose: make(chan bool), + close: make(chan bool), + err: make(chan bool), inspector: &InspectorChain{}, } } func (v *Stream) Read() (*buf.Buffer, error) { select { - case <-v.destClose: + case <-v.err: return nil, io.ErrClosedPipe case b := <-v.buffer: return b, nil @@ -78,9 +78,9 @@ func (v *Stream) Read() (*buf.Buffer, error) { select { case b := <-v.buffer: return b, nil - case <-v.srcClose: + case <-v.close: return nil, io.EOF - case <-v.destClose: + case <-v.err: return nil, io.ErrClosedPipe } } @@ -88,7 +88,7 @@ func (v *Stream) Read() (*buf.Buffer, error) { func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) { select { - case <-v.destClose: + case <-v.err: return nil, io.ErrClosedPipe case b := <-v.buffer: return b, nil @@ -96,9 +96,9 @@ func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) { select { case b := <-v.buffer: return b, nil - case <-v.srcClose: + case <-v.close: return nil, io.EOF - case <-v.destClose: + case <-v.err: return nil, io.ErrClosedPipe case <-time.After(timeout): return nil, ErrReadTimeout @@ -112,15 +112,15 @@ func (v *Stream) Write(data *buf.Buffer) (err error) { } select { - case <-v.destClose: + case <-v.err: return io.ErrClosedPipe - case <-v.srcClose: + case <-v.close: return io.ErrClosedPipe default: select { - case <-v.destClose: + case <-v.err: return io.ErrClosedPipe - case <-v.srcClose: + case <-v.close: return io.ErrClosedPipe case v.buffer <- data: v.inspector.Input(data) @@ -132,13 +132,13 @@ func (v *Stream) Write(data *buf.Buffer) (err error) { func (v *Stream) Close() { defer swallowPanic() - close(v.srcClose) + close(v.close) } -func (v *Stream) ForceClose() { +func (v *Stream) CloseError() { defer swallowPanic() - close(v.destClose) + close(v.err) v.Close() n := len(v.buffer) diff --git a/transport/ray/ray.go b/transport/ray/ray.go index 26b8f0439..3d00e938f 100644 --- a/transport/ray/ray.go +++ b/transport/ray/ray.go @@ -35,13 +35,18 @@ type Ray interface { AddInspector(Inspector) } +type RayStream interface { + Close() + CloseError() +} + type InputStream interface { buf.Reader + RayStream ReadTimeout(time.Duration) (*buf.Buffer, error) - ForceClose() } type OutputStream interface { buf.Writer - Close() + RayStream }