diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 8db25918a..517fd0d3e 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -154,7 +154,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { return } } - if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil { + if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil { log.Trace(newError("failed to fetch all input").Base(err)) } } @@ -309,7 +309,7 @@ type ServerWorker struct { func handle(ctx context.Context, s *Session, output buf.Writer) { writer := NewResponseWriter(s.ID, output) - if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil { + if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil { log.Trace(newError("session ", s.ID, " ends: ").Base(err)) } writer.Close() diff --git a/common/buf/io.go b/common/buf/io.go index a36158492..df18d95c6 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -40,9 +40,7 @@ func ReadFullFrom(reader io.Reader, size int) Supplier { } } -// Pipe dumps all payload from reader to writer, until an error occurs. -// ActivityTimer gets updated as soon as there is a payload. -func Pipe(timer signal.ActivityTimer, reader Reader, writer Writer) error { +func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer) error { for { buffer, err := reader.Read() if err != nil { @@ -64,9 +62,10 @@ func Pipe(timer signal.ActivityTimer, reader Reader, writer Writer) error { } } -// PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF. -func PipeUntilEOF(timer signal.ActivityTimer, reader Reader, writer Writer) error { - err := Pipe(timer, reader, writer) +// Copy dumps all payload from reader to writer or stops when an error occurs. +// ActivityTimer gets updated as soon as there is a payload. +func Copy(timer signal.ActivityTimer, reader Reader, writer Writer) error { + err := copyInternal(timer, reader, writer) if err != nil && errors.Cause(err) != io.EOF { return err } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 6519e897f..1c4a7716a 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -76,7 +76,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in chunkReader := buf.NewReader(conn) - if err := buf.PipeUntilEOF(timer, chunkReader, inboundRay.InboundInput()); err != nil { + if err := buf.Copy(timer, chunkReader, inboundRay.InboundInput()); err != nil { return newError("failed to transport request").Base(err) } @@ -86,7 +86,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in responseDone := signal.ExecuteAsync(func() error { v2writer := buf.NewWriter(conn) - if err := buf.PipeUntilEOF(timer, inboundRay.InboundOutput(), v2writer); err != nil { + if err := buf.Copy(timer, inboundRay.InboundOutput(), v2writer); err != nil { return newError("failed to transport response").Base(err) } return nil diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index e815f6044..a2bd0182c 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -119,7 +119,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } else { writer = &seqWriter{writer: conn} } - if err := buf.PipeUntilEOF(timer, input, writer); err != nil { + if err := buf.Copy(timer, input, writer); err != nil { return newError("failed to process request").Base(err) } return nil @@ -129,7 +129,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial defer output.Close() v2reader := buf.NewReader(conn) - if err := buf.PipeUntilEOF(timer, v2reader, output); err != nil { + if err := buf.Copy(timer, v2reader, output); err != nil { return newError("failed to process response").Base(err) } return nil diff --git a/proxy/http/server.go b/proxy/http/server.go index 65e835e7e..18f895780 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -152,7 +152,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade defer ray.InboundInput().Close() v2reader := buf.NewReader(reader) - if err := buf.PipeUntilEOF(timer, v2reader, ray.InboundInput()); err != nil { + if err := buf.Copy(timer, v2reader, ray.InboundInput()); err != nil { return err } return nil @@ -160,7 +160,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade responseDone := signal.ExecuteAsync(func() error { v2writer := buf.NewWriter(writer) - if err := buf.PipeUntilEOF(timer, ray.InboundOutput(), v2writer); err != nil { + if err := buf.Copy(timer, ray.InboundOutput(), v2writer); err != nil { return err } return nil diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 5da0d01c0..91cdea448 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -105,7 +105,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale } requestDone := signal.ExecuteAsync(func() error { - if err := buf.PipeUntilEOF(timer, outboundRay.OutboundInput(), bodyWriter); err != nil { + if err := buf.Copy(timer, outboundRay.OutboundInput(), bodyWriter); err != nil { return err } return nil @@ -119,7 +119,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale return err } - if err := buf.PipeUntilEOF(timer, responseReader, outboundRay.OutboundOutput()); err != nil { + if err := buf.Copy(timer, responseReader, outboundRay.OutboundOutput()); err != nil { return err } @@ -141,7 +141,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale } requestDone := signal.ExecuteAsync(func() error { - if err := buf.PipeUntilEOF(timer, outboundRay.OutboundInput(), writer); err != nil { + if err := buf.Copy(timer, outboundRay.OutboundInput(), writer); err != nil { return newError("failed to transport all UDP request").Base(err) } return nil @@ -155,7 +155,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale User: user, } - if err := buf.PipeUntilEOF(timer, reader, outboundRay.OutboundOutput()); err != nil { + if err := buf.Copy(timer, reader, outboundRay.OutboundOutput()); err != nil { return newError("failed to transport all UDP response").Base(err) } return nil diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index f22a19f9b..6ced719ee 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -173,7 +173,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, return err } - if err := buf.PipeUntilEOF(timer, ray.InboundOutput(), responseWriter); err != nil { + if err := buf.Copy(timer, ray.InboundOutput(), responseWriter); err != nil { return newError("failed to transport all TCP response").Base(err) } @@ -183,7 +183,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, requestDone := signal.ExecuteAsync(func() error { defer ray.InboundInput().Close() - if err := buf.PipeUntilEOF(timer, bodyReader, ray.InboundInput()); err != nil { + if err := buf.Copy(timer, bodyReader, ray.InboundInput()); err != nil { return newError("failed to transport all TCP request").Base(err) } return nil diff --git a/proxy/socks/client.go b/proxy/socks/client.go index 6a18c09fd..97eaee6af 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -90,11 +90,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. var responseFunc func() error if request.Command == protocol.RequestCommandTCP { requestFunc = func() error { - return buf.PipeUntilEOF(timer, ray.OutboundInput(), buf.NewWriter(conn)) + return buf.Copy(timer, ray.OutboundInput(), buf.NewWriter(conn)) } responseFunc = func() error { defer ray.OutboundOutput().Close() - return buf.PipeUntilEOF(timer, buf.NewReader(conn), ray.OutboundOutput()) + return buf.Copy(timer, buf.NewReader(conn), ray.OutboundOutput()) } } else if request.Command == protocol.RequestCommandUDP { udpConn, err := dialer.Dial(ctx, udpRequest.Destination()) @@ -103,12 +103,12 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. } defer udpConn.Close() requestFunc = func() error { - return buf.PipeUntilEOF(timer, ray.OutboundInput(), &UDPWriter{request: request, writer: udpConn}) + return buf.Copy(timer, ray.OutboundInput(), &UDPWriter{request: request, writer: udpConn}) } responseFunc = func() error { defer ray.OutboundOutput().Close() reader := &UDPReader{reader: udpConn} - return buf.PipeUntilEOF(timer, reader, ray.OutboundOutput()) + return buf.Copy(timer, reader, ray.OutboundOutput()) } } diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 5b3c2b5ea..cf6e08211 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -125,7 +125,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ defer input.Close() v2reader := buf.NewReader(reader) - if err := buf.PipeUntilEOF(timer, v2reader, input); err != nil { + if err := buf.Copy(timer, v2reader, input); err != nil { return newError("failed to transport all TCP request").Base(err) } return nil @@ -133,7 +133,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ responseDone := signal.ExecuteAsync(func() error { v2writer := buf.NewWriter(writer) - if err := buf.PipeUntilEOF(timer, output, v2writer); err != nil { + if err := buf.Copy(timer, output, v2writer); err != nil { return newError("failed to transport all TCP response").Base(err) } return nil diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 773b4a3e2..790a8fd5d 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -129,7 +129,7 @@ func transferRequest(timer signal.ActivityTimer, session *encoding.ServerSession defer output.Close() bodyReader := session.DecodeRequestBody(request, input) - if err := buf.PipeUntilEOF(timer, bodyReader, output); err != nil { + if err := buf.Copy(timer, bodyReader, output); err != nil { return err } return nil @@ -157,7 +157,7 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio } } - if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil { + if err := buf.Copy(timer, input, bodyWriter); err != nil { return err } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 544c4d322..d1584d5c7 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -123,7 +123,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial return err } - if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil { + if err := buf.Copy(timer, input, bodyWriter); err != nil { return err } @@ -147,7 +147,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial reader.SetBuffered(false) bodyReader := session.DecodeResponseBody(request, reader) - if err := buf.PipeUntilEOF(timer, bodyReader, output); err != nil { + if err := buf.Copy(timer, bodyReader, output); err != nil { return err }