diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index c0bcc1621..09918120e 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -167,7 +167,8 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { Destination: dest, Inbound: v.meta, }) - defer ray.InboundOutput().Release() + output := ray.InboundOutput() + defer output.ForceClose() reader := v2net.NewTimeOutReader(v.config.Timeout, conn) defer reader.Release() @@ -187,19 +188,21 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { }) responseDone := signal.ExecuteAsync(func() error { - defer ray.InboundOutput().Release() + defer output.ForceClose() v2writer := buf.NewWriter(conn) defer v2writer.Release() - if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { + if err := buf.PipeUntilEOF(output, v2writer); err != nil { log.Info("Dokodemo: Failed to transport all TCP response: ", err) return err } return nil }) - signal.ErrorOrFinish2(requestDone, responseDone) + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("Dokodemo: Connection ends with ", err) + } } type Factory struct{} diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 3249a8fa5..8fe0e471e 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -71,8 +71,10 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r log.Info("Freedom: Opening connection to ", destination) defer payload.Release() - defer ray.OutboundInput().Release() - defer ray.OutboundOutput().Close() + input := ray.OutboundInput() + output := ray.OutboundOutput() + defer input.ForceClose() + defer output.Close() var conn internet.Connection if v.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() { @@ -92,9 +94,6 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r } defer conn.Close() - input := ray.OutboundInput() - output := ray.OutboundOutput() - if !payload.IsEmpty() { if _, err := conn.Write(payload.Bytes()); err != nil { log.Warning("Freedom: Failed to write to destination: ", destination, ": ", err) @@ -103,7 +102,7 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r } requestDone := signal.ExecuteAsync(func() error { - defer input.Release() + defer input.ForceClose() v2writer := buf.NewWriter(conn) defer v2writer.Release() diff --git a/proxy/http/server.go b/proxy/http/server.go index bd06edede..a17936f89 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -169,7 +169,7 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo }) responseDone := signal.ExecuteAsync(func() error { - defer ray.InboundOutput().Release() + defer ray.InboundOutput().ForceClose() v2writer := buf.NewWriter(writer) defer v2writer.Release() @@ -236,9 +236,14 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn StripHopByHopHeaders(request) ray := v.packetDispatcher.DispatchToOutbound(session) + input := ray.InboundInput() + output := ray.InboundOutput() + + defer input.Close() + defer output.ForceClose() requestDone := signal.ExecuteAsync(func() error { - defer ray.InboundInput().Close() + defer input.Close() requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput())) defer requestWriter.Release() @@ -254,7 +259,7 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn }) responseDone := signal.ExecuteAsync(func() error { - defer ray.InboundOutput().Release() + defer output.ForceClose() responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput())) response, err := http.ReadResponse(responseReader, request) diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 12b41e5e4..a53fb006e 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -109,7 +109,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra bufferedWriter.SetBuffered(false) requestDone := signal.ExecuteAsync(func() error { - defer ray.OutboundInput().Release() + defer ray.OutboundInput().ForceClose() if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { return err @@ -151,7 +151,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra } requestDone := signal.ExecuteAsync(func() error { - defer ray.OutboundInput().Release() + defer ray.OutboundInput().ForceClose() if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 854c32729..9f1761f51 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -175,11 +175,11 @@ func (v *Server) handleConnection(conn internet.Connection) { User: request.User, Inbound: v.meta, }) - defer ray.InboundOutput().Release() + defer ray.InboundOutput().ForceClose() defer ray.InboundInput().Close() requestDone := signal.ExecuteAsync(func() error { - defer ray.InboundOutput().Release() + defer ray.InboundOutput().ForceClose() bufferedWriter := bufio.NewWriter(conn) defer bufferedWriter.Release() diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 0ec3d03e5..e13b2d8ec 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -314,7 +314,7 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se }) responseDone := signal.ExecuteAsync(func() error { - defer output.Release() + defer output.ForceClose() v2writer := buf.NewWriter(writer) defer v2writer.Release() diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 04ae2a45a..c06450984 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -143,7 +143,7 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH } func transferResponse(session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error { - defer input.Release() + defer input.ForceClose() session.EncodeResponseHeader(response, output) bodyWriter := session.EncodeResponseBody(request, output) @@ -218,7 +218,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { input := ray.InboundInput() output := ray.InboundOutput() defer input.Close() - defer output.Release() + defer output.ForceClose() userSettings := request.User.GetSettings() connReader.SetTimeOut(userSettings.PayloadReadTimeout) diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 41ad0e39d..668a19e7f 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -28,7 +28,7 @@ type VMessOutboundHandler struct { // Dispatch implements OutboundHandler.Dispatch(). func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) { defer payload.Release() - defer ray.OutboundInput().Release() + defer ray.OutboundInput().ForceClose() defer ray.OutboundOutput().Close() var rec *protocol.ServerSpec @@ -83,7 +83,7 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.B session := encoding.NewClientSession(protocol.DefaultIDHash) requestDone := signal.ExecuteAsync(func() error { - defer input.Release() + defer input.ForceClose() writer := bufio.NewWriter(conn) defer writer.Release() diff --git a/transport/ray/ray.go b/transport/ray/ray.go index a71ccb8af..1a4c9ba45 100644 --- a/transport/ray/ray.go +++ b/transport/ray/ray.go @@ -35,12 +35,10 @@ type Ray interface { type InputStream interface { buf.Reader - Close() ForceClose() } type OutputStream interface { buf.Writer Close() - ForceClose() }