diff --git a/common/io/transport.go b/common/io/transport.go index 344eaa499..e4a7f04f3 100644 --- a/common/io/transport.go +++ b/common/io/transport.go @@ -1,6 +1,7 @@ package io import ( + "io" "v2ray.com/core/common/log" ) @@ -25,3 +26,11 @@ func Pipe(reader Reader, writer Writer) error { } } } + +func PipeUntilEOF(reader Reader, writer Writer) error { + err := Pipe(reader, writer) + if err != nil && err != io.EOF { + return err + } + return nil +} diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index a50c7cbcb..7f1187ec9 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -179,7 +179,9 @@ func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { v2reader := v2io.NewAdaptiveReader(reader) defer v2reader.Release() - v2io.Pipe(v2reader, ray.InboundInput()) + if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { + log.Info("Dokodemo: Failed to transport all TCP request: ", err) + } wg.Done() ray.InboundInput().Close() }() @@ -189,7 +191,9 @@ func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { v2writer := v2io.NewAdaptiveWriter(conn) defer v2writer.Release() - v2io.Pipe(ray.InboundOutput(), v2writer) + if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { + log.Info("Dokodemo: Failed to transport all TCP response: ", err) + } wg.Done() }() diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 32bd965db..7f86a2a20 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -104,7 +104,9 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload * v2writer := v2io.NewAdaptiveWriter(conn) defer v2writer.Release() - v2io.Pipe(input, v2writer) + if err := v2io.PipeUntilEOF(input, v2writer); err != nil { + log.Info("Freedom: Failed to transport all TCP request: ", err) + } if tcpConn, ok := conn.(*tcp.RawConnection); ok { tcpConn.CloseWrite() } @@ -121,7 +123,9 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload * } v2reader := v2io.NewAdaptiveReader(reader) - v2io.Pipe(v2reader, output) + if err := v2io.PipeUntilEOF(v2reader, output); err != nil { + log.Info("Freedom: Failed to transport all TCP response: ", err) + } v2reader.Release() ray.OutboundOutput().Close() diff --git a/proxy/http/server.go b/proxy/http/server.go index ce26153b6..56fdc8081 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -160,7 +160,9 @@ func (this *Server) transport(input io.Reader, output io.Writer, ray ray.Inbound v2reader := v2io.NewAdaptiveReader(input) defer v2reader.Release() - v2io.Pipe(v2reader, ray.InboundInput()) + if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { + log.Info("HTTP: Failed to transport all TCP request: ", err) + } ray.InboundInput().Close() wg.Done() }() @@ -169,7 +171,9 @@ func (this *Server) transport(input io.Reader, output io.Writer, ray ray.Inbound v2writer := v2io.NewAdaptiveWriter(output) defer v2writer.Release() - v2io.Pipe(ray.InboundOutput(), v2writer) + if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { + log.Info("HTTP: Failed to transport all TCP response: ", err) + } ray.InboundOutput().Release() wg.Done() }() diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index c16e41101..da43c9270 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -2,7 +2,6 @@ package shadowsocks import ( "errors" - "io" "sync" "v2ray.com/core/app" "v2ray.com/core/common/alloc" @@ -113,18 +112,14 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe return } - if err := v2io.Pipe(responseReader, ray.OutboundOutput()); err != nil { - if err != io.EOF { - log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err) - } + if err := v2io.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil { + log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err) } }() bufferedWriter.SetCached(false) - if err := v2io.Pipe(ray.OutboundInput(), bodyWriter); err != nil { - if err != io.EOF { - log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err) - } + if err := v2io.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { + log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err) } responseMutex.Lock() @@ -143,10 +138,8 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe User: user, } - if err := v2io.Pipe(reader, ray.OutboundOutput()); err != nil { - if err != io.EOF { - log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err) - } + if err := v2io.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil { + log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err) } }() @@ -159,10 +152,8 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe return errors.New("Shadowsocks|Client: Failed to write payload: " + err.Error()) } } - if err := v2io.Pipe(ray.OutboundInput(), writer); err != nil { - if err != io.EOF { - log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) - } + if err := v2io.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { + log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) } responseMutex.Lock() diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index ed95ef759..c3069080e 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -198,11 +198,15 @@ func (this *Server) handleConnection(conn internet.Connection) { responseWriter.Write(payload) bufferedWriter.SetCached(false) - v2io.Pipe(ray.InboundOutput(), responseWriter) + if err := v2io.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { + log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) + } } }() - v2io.Pipe(bodyReader, ray.InboundInput()) + if err := v2io.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil { + log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err) + } ray.InboundInput().Close() writeFinish.Lock() diff --git a/proxy/socks/server.go b/proxy/socks/server.go index d3030f0e4..bcaa34056 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -304,14 +304,18 @@ func (this *Server) transport(reader io.Reader, writer io.Writer, session *proxy v2reader := v2io.NewAdaptiveReader(reader) defer v2reader.Release() - v2io.Pipe(v2reader, input) + if err := v2io.PipeUntilEOF(v2reader, input); err != nil { + log.Info("Socks|Server: Failed to transport all TCP request: ", err) + } input.Close() }() v2writer := v2io.NewAdaptiveWriter(writer) defer v2writer.Release() - v2io.Pipe(output, v2writer) + if err := v2io.PipeUntilEOF(output, v2writer); err != nil { + log.Info("Socks|Server: Failed to transport all TCP response: ", err) + } output.Release() } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 011c7b262..ff5737f33 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -192,8 +192,7 @@ func (this *VMessInboundHandler) HandleConnection(connection internet.Connection } else { requestReader = v2io.NewAdaptiveReader(bodyReader) } - err := v2io.Pipe(requestReader, input) - if err != io.EOF { + if err := v2io.PipeUntilEOF(requestReader, input); err != nil { connection.SetReusable(false) } @@ -229,8 +228,7 @@ func (this *VMessInboundHandler) HandleConnection(connection internet.Connection writer.SetCached(false) - err = v2io.Pipe(output, v2writer) - if err != io.EOF { + if err := v2io.PipeUntilEOF(output, v2writer); err != nil { connection.SetReusable(false) } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 2057ed260..3bbeacf47 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -1,7 +1,6 @@ package outbound import ( - "io" "sync" "v2ray.com/core/app" @@ -107,8 +106,7 @@ func (this *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, } writer.SetCached(false) - err := v2io.Pipe(input, streamWriter) - if err != io.EOF { + if err := v2io.PipeUntilEOF(input, streamWriter); err != nil { conn.SetReusable(false) } @@ -150,8 +148,7 @@ func (this *VMessOutboundHandler) handleResponse(session *encoding.ClientSession bodyReader = v2io.NewAdaptiveReader(decryptReader) } - err = v2io.Pipe(bodyReader, output) - if err != io.EOF { + if err := v2io.PipeUntilEOF(bodyReader, output); err != nil { conn.SetReusable(false) }