From bd8481898acd3f6d69ba6182af45b977005ae563 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 27 Apr 2017 16:48:48 +0200 Subject: [PATCH 1/4] Update version --- core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core.go b/core.go index d23e7432d..d2b215c45 100644 --- a/core.go +++ b/core.go @@ -18,7 +18,7 @@ import ( ) var ( - version = "2.25" + version = "2.26" build = "Custom" codename = "One for all" intro = "An unified platform for anti-censorship." From f418b9bc20c2a5269b887640618c74ba3add378a Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 27 Apr 2017 22:20:29 +0200 Subject: [PATCH 2/4] swallow write error in mux --- app/proxyman/mux/mux.go | 20 ++++------------- app/proxyman/mux/reader.go | 46 ++++++++++++++++++++++++++++---------- common/buf/io.go | 45 ++++++++++++++++++++++++++++++++----- common/buf/writer.go | 11 +++++++++ 4 files changed, 88 insertions(+), 34 deletions(-) diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 4236577a1..f22213765 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -175,22 +175,10 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool } func drain(reader *Reader) error { - data, err := reader.Read() - if err != nil { - return err - } - data.Release() + buf.Copy(signal.BackgroundTimer(), reader, buf.Discard) return nil } -func pipe(reader *Reader, writer buf.Writer) error { - data, err := reader.Read() - if err != nil { - return err - } - return writer.Write(data) -} - func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *Reader) error { if meta.Option.Has(OptionData) { return drain(reader) @@ -211,7 +199,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *Reader) error { } if s, found := m.sessionManager.Get(meta.SessionID); found { - return pipe(reader, s.output) + return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError()) } return drain(reader) } @@ -335,7 +323,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, w.sessionManager.Add(s) go handle(ctx, s, w.outboundRay.OutboundOutput()) if meta.Option.Has(OptionData) { - return pipe(reader, s.output) + return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError()) } return nil } @@ -345,7 +333,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *Reader) err return nil } if s, found := w.sessionManager.Get(meta.SessionID); found { - return pipe(reader, s.output) + return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError()) } return drain(reader) } diff --git a/app/proxyman/mux/reader.go b/app/proxyman/mux/reader.go index 8cede0a2b..278a05c49 100644 --- a/app/proxyman/mux/reader.go +++ b/app/proxyman/mux/reader.go @@ -8,18 +8,22 @@ import ( ) type Reader struct { - reader io.Reader - buffer *buf.Buffer + reader io.Reader + buffer *buf.Buffer + leftOver int } func NewReader(reader buf.Reader) *Reader { return &Reader{ - reader: buf.ToBytesReader(reader), - buffer: buf.NewLocal(1024), + reader: buf.ToBytesReader(reader), + buffer: buf.NewLocal(1024), + leftOver: -1, } } func (r *Reader) ReadMetadata() (*FrameMetadata, error) { + r.leftOver = -1 + b := r.buffer b.Clear() @@ -37,25 +41,43 @@ func (r *Reader) ReadMetadata() (*FrameMetadata, error) { return ReadFrameFrom(b.Bytes()) } -func (r *Reader) Read() (buf.MultiBuffer, error) { +func (r *Reader) readSize() error { if err := r.buffer.Reset(buf.ReadFullFrom(r.reader, 2)); err != nil { - return nil, err + return err + } + r.leftOver = int(serial.BytesToUint16(r.buffer.Bytes())) + return nil +} + +func (r *Reader) Read() (buf.MultiBuffer, error) { + if r.leftOver == 0 { + r.leftOver = -1 + return nil, io.EOF + } + if r.leftOver == -1 { + if err := r.readSize(); err != nil { + return nil, err + } } - dataLen := int(serial.BytesToUint16(r.buffer.Bytes())) mb := buf.NewMultiBuffer() - for dataLen > 0 { + for r.leftOver > 0 { readLen := buf.Size - if dataLen < readLen { - readLen = dataLen + if r.leftOver < readLen { + readLen = r.leftOver } b := buf.New() - if err := b.AppendSupplier(buf.ReadFullFrom(r.reader, readLen)); err != nil { + if err := b.AppendSupplier(func(bb []byte) (int, error) { + return r.reader.Read(bb[:readLen]) + }); err != nil { mb.Release() return nil, err } - dataLen -= readLen + r.leftOver -= b.Len() mb.Append(b) + if b.Len() < readLen { + break + } } return mb, nil diff --git a/common/buf/io.go b/common/buf/io.go index 37d52e4a8..b18fe3768 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -47,13 +47,40 @@ func ReadAtLeastFrom(reader io.Reader, size int) Supplier { } } -func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer) error { +type copyHandler struct { + onReadError func(error) error + onData func() + onWriteError func(error) error +} + +type CopyOption func(*copyHandler) + +func IgnoreReaderError() CopyOption { + return func(handler *copyHandler) { + handler.onReadError = func(err error) error { + return nil + } + } +} + +func IgnoreWriterError() CopyOption { + return func(handler *copyHandler) { + handler.onWriteError = func(err error) error { + return nil + } + } +} + +func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer, handler copyHandler) error { for { buffer, err := reader.Read() if err != nil { - return err + if err = handler.onReadError(err); err != nil { + return err + } } + handler.onData() timer.Update() if buffer.IsEmpty() { @@ -62,16 +89,22 @@ func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer) erro } if err := writer.Write(buffer); err != nil { - buffer.Release() - return err + if err = handler.onWriteError(err); err != nil { + buffer.Release() + return err + } } } } // Copy dumps all payload from reader to writer or stops when an error occurs. // ActivityTimer gets updated as soon as there is a payload. -func Copy(timer signal.ActivityTimer, reader Reader, writer Writer) error { - err := copyInternal(timer, reader, writer) +func Copy(timer signal.ActivityTimer, reader Reader, writer Writer, options ...CopyOption) error { + handler := copyHandler{} + for _, option := range options { + option(&handler) + } + err := copyInternal(timer, reader, writer, handler) if err != nil && errors.Cause(err) != io.EOF { return err } diff --git a/common/buf/writer.go b/common/buf/writer.go index 988580d1c..ab7fbca5b 100644 --- a/common/buf/writer.go +++ b/common/buf/writer.go @@ -106,3 +106,14 @@ func (w *bytesToBufferWriter) ReadFrom(reader io.Reader) (int64, error) { } return totalBytes, nil } + +type noOpWriter struct{} + +func (noOpWriter) Write(b MultiBuffer) error { + b.Release() + return nil +} + +var ( + Discard Writer = noOpWriter{} +) From 6f3362fc4c592a45a28de86e8d444e4afdaf9f5a Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 27 Apr 2017 22:30:48 +0200 Subject: [PATCH 3/4] update activity --- app/proxyman/mux/mux.go | 13 ++++++------- common/buf/io.go | 15 +++++++++++---- proxy/dokodemo/dokodemo.go | 4 ++-- proxy/freedom/freedom.go | 4 ++-- proxy/http/server.go | 4 ++-- proxy/shadowsocks/client.go | 8 ++++---- proxy/shadowsocks/server.go | 4 ++-- proxy/socks/client.go | 8 ++++---- proxy/socks/server.go | 4 ++-- proxy/vmess/inbound/inbound.go | 4 ++-- proxy/vmess/outbound/outbound.go | 4 ++-- 11 files changed, 39 insertions(+), 33 deletions(-) diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index f22213765..ad3bcc32a 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -15,7 +15,6 @@ import ( "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" "v2ray.com/core/common/net" - "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/ray" ) @@ -147,7 +146,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { log.Trace(newError("failed to write first payload").Base(err)) return } - if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil { + if err := buf.Copy(s.input, writer); err != nil { log.Trace(newError("failed to fetch all input").Base(err)) } } @@ -175,7 +174,7 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool } func drain(reader *Reader) error { - buf.Copy(signal.BackgroundTimer(), reader, buf.Discard) + buf.Copy(reader, buf.Discard) return nil } @@ -199,7 +198,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *Reader) error { } if s, found := m.sessionManager.Get(meta.SessionID); found { - return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError()) + return buf.Copy(reader, s.output, buf.IgnoreWriterError()) } return drain(reader) } @@ -291,7 +290,7 @@ type ServerWorker struct { func handle(ctx context.Context, s *Session, output buf.Writer) { writer := NewResponseWriter(s.ID, output) - if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil { + if err := buf.Copy(s.input, writer); err != nil { log.Trace(newError("session ", s.ID, " ends: ").Base(err)) } writer.Close() @@ -323,7 +322,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, w.sessionManager.Add(s) go handle(ctx, s, w.outboundRay.OutboundOutput()) if meta.Option.Has(OptionData) { - return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError()) + return buf.Copy(reader, s.output, buf.IgnoreWriterError()) } return nil } @@ -333,7 +332,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *Reader) err return nil } if s, found := w.sessionManager.Get(meta.SessionID); found { - return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError()) + return buf.Copy(reader, s.output, buf.IgnoreWriterError()) } return drain(reader) } diff --git a/common/buf/io.go b/common/buf/io.go index b18fe3768..3c47e334e 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -71,7 +71,15 @@ func IgnoreWriterError() CopyOption { } } -func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer, handler copyHandler) error { +func UpdateActivity(timer signal.ActivityTimer) CopyOption { + return func(handler *copyHandler) { + handler.onData = func() { + timer.Update() + } + } +} + +func copyInternal(reader Reader, writer Writer, handler copyHandler) error { for { buffer, err := reader.Read() if err != nil { @@ -81,7 +89,6 @@ func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer, hand } handler.onData() - timer.Update() if buffer.IsEmpty() { buffer.Release() @@ -99,12 +106,12 @@ func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer, hand // Copy dumps all payload from reader to writer or stops when an error occurs. // ActivityTimer gets updated as soon as there is a payload. -func Copy(timer signal.ActivityTimer, reader Reader, writer Writer, options ...CopyOption) error { +func Copy(reader Reader, writer Writer, options ...CopyOption) error { handler := copyHandler{} for _, option := range options { option(&handler) } - err := copyInternal(timer, reader, writer, handler) + err := copyInternal(reader, writer, handler) if err != nil && errors.Cause(err) != io.EOF { return err } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 1c4a7716a..c6500579f 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -76,7 +76,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in chunkReader := buf.NewReader(conn) - if err := buf.Copy(timer, chunkReader, inboundRay.InboundInput()); err != nil { + if err := buf.Copy(chunkReader, inboundRay.InboundInput(), buf.UpdateActivity(timer)); err != nil { return newError("failed to transport request").Base(err) } @@ -86,7 +86,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in responseDone := signal.ExecuteAsync(func() error { v2writer := buf.NewWriter(conn) - if err := buf.Copy(timer, inboundRay.InboundOutput(), v2writer); err != nil { + if err := buf.Copy(inboundRay.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport response").Base(err) } return nil diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 147ce1a1e..3b292b7e6 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -118,7 +118,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } else { writer = buf.NewSequentialWriter(conn) } - if err := buf.Copy(timer, input, writer); err != nil { + if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to process request").Base(err) } return nil @@ -128,7 +128,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial defer output.Close() v2reader := buf.NewReader(conn) - if err := buf.Copy(timer, v2reader, output); err != nil { + if err := buf.Copy(v2reader, output, buf.UpdateActivity(timer)); err != nil { return newError("failed to process response").Base(err) } return nil diff --git a/proxy/http/server.go b/proxy/http/server.go index a7d777cf3..fa61d9797 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -141,7 +141,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade defer ray.InboundInput().Close() v2reader := buf.NewReader(reader) - if err := buf.Copy(timer, v2reader, ray.InboundInput()); err != nil { + if err := buf.Copy(v2reader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil { return err } return nil @@ -149,7 +149,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade responseDone := signal.ExecuteAsync(func() error { v2writer := buf.NewWriter(writer) - if err := buf.Copy(timer, ray.InboundOutput(), v2writer); err != nil { + if err := buf.Copy(ray.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil { return err } return nil diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index e6fa70ade..dd1a5177c 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -105,7 +105,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale } requestDone := signal.ExecuteAsync(func() error { - if err := buf.Copy(timer, outboundRay.OutboundInput(), bodyWriter); err != nil { + if err := buf.Copy(outboundRay.OutboundInput(), bodyWriter, buf.UpdateActivity(timer)); err != nil { return err } return nil @@ -119,7 +119,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale return err } - if err := buf.Copy(timer, responseReader, outboundRay.OutboundOutput()); err != nil { + if err := buf.Copy(responseReader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil { return err } @@ -141,7 +141,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale }) requestDone := signal.ExecuteAsync(func() error { - if err := buf.Copy(timer, outboundRay.OutboundInput(), writer); err != nil { + if err := buf.Copy(outboundRay.OutboundInput(), writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all UDP request").Base(err) } return nil @@ -155,7 +155,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale User: user, } - if err := buf.Copy(timer, reader, outboundRay.OutboundOutput()); err != nil { + if err := buf.Copy(reader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all UDP response").Base(err) } return nil diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 158e89be5..4ee5733cc 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -173,7 +173,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, return err } - if err := buf.Copy(timer, ray.InboundOutput(), responseWriter); err != nil { + if err := buf.Copy(ray.InboundOutput(), responseWriter, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP response").Base(err) } @@ -183,7 +183,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, requestDone := signal.ExecuteAsync(func() error { defer ray.InboundInput().Close() - if err := buf.Copy(timer, bodyReader, ray.InboundInput()); err != nil { + if err := buf.Copy(bodyReader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP request").Base(err) } return nil diff --git a/proxy/socks/client.go b/proxy/socks/client.go index c8414da6d..5e57044c6 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -90,11 +90,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. var responseFunc func() error if request.Command == protocol.RequestCommandTCP { requestFunc = func() error { - return buf.Copy(timer, ray.OutboundInput(), buf.NewWriter(conn)) + return buf.Copy(ray.OutboundInput(), buf.NewWriter(conn), buf.UpdateActivity(timer)) } responseFunc = func() error { defer ray.OutboundOutput().Close() - return buf.Copy(timer, buf.NewReader(conn), ray.OutboundOutput()) + return buf.Copy(buf.NewReader(conn), ray.OutboundOutput(), buf.UpdateActivity(timer)) } } else if request.Command == protocol.RequestCommandUDP { udpConn, err := dialer.Dial(ctx, udpRequest.Destination()) @@ -103,12 +103,12 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. } defer udpConn.Close() requestFunc = func() error { - return buf.Copy(timer, ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn))) + return buf.Copy(ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer)) } responseFunc = func() error { defer ray.OutboundOutput().Close() reader := &UDPReader{reader: udpConn} - return buf.Copy(timer, reader, ray.OutboundOutput()) + return buf.Copy(reader, ray.OutboundOutput(), buf.UpdateActivity(timer)) } } diff --git a/proxy/socks/server.go b/proxy/socks/server.go index de7bc254d..29873a3e2 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -124,7 +124,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ defer input.Close() v2reader := buf.NewReader(reader) - if err := buf.Copy(timer, v2reader, input); err != nil { + if err := buf.Copy(v2reader, input, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP request").Base(err) } return nil @@ -132,7 +132,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ responseDone := signal.ExecuteAsync(func() error { v2writer := buf.NewWriter(writer) - if err := buf.Copy(timer, output, v2writer); err != nil { + if err := buf.Copy(output, v2writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP response").Base(err) } return nil diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 790a8fd5d..250d8b2db 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -129,7 +129,7 @@ func transferRequest(timer signal.ActivityTimer, session *encoding.ServerSession defer output.Close() bodyReader := session.DecodeRequestBody(request, input) - if err := buf.Copy(timer, bodyReader, output); err != nil { + if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil { return err } return nil @@ -157,7 +157,7 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio } } - if err := buf.Copy(timer, input, bodyWriter); err != nil { + if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil { return err } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index d1584d5c7..239c16e51 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -123,7 +123,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial return err } - if err := buf.Copy(timer, input, bodyWriter); err != nil { + if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil { return err } @@ -147,7 +147,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial reader.SetBuffered(false) bodyReader := session.DecodeResponseBody(request, reader) - if err := buf.Copy(timer, bodyReader, output); err != nil { + if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil { return err } From 3821ee21fc5894ae0a59c7cc4995e180c77e5769 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 27 Apr 2017 23:33:07 +0200 Subject: [PATCH 4/4] handle nil onData --- common/buf/io.go | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/common/buf/io.go b/common/buf/io.go index 3c47e334e..e04cebae2 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -53,6 +53,22 @@ type copyHandler struct { onWriteError func(error) error } +func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) { + mb, err := reader.Read() + if err != nil && h.onReadError != nil { + err = h.onReadError(err) + } + return mb, err +} + +func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error { + err := writer.Write(mb) + if err != nil && h.onWriteError != nil { + err = h.onWriteError(err) + } + return err +} + type CopyOption func(*copyHandler) func IgnoreReaderError() CopyOption { @@ -79,27 +95,25 @@ func UpdateActivity(timer signal.ActivityTimer) CopyOption { } } -func copyInternal(reader Reader, writer Writer, handler copyHandler) error { +func copyInternal(reader Reader, writer Writer, handler *copyHandler) error { for { - buffer, err := reader.Read() + buffer, err := handler.readFrom(reader) if err != nil { - if err = handler.onReadError(err); err != nil { - return err - } + return err } - handler.onData() - if buffer.IsEmpty() { buffer.Release() continue } - if err := writer.Write(buffer); err != nil { - if err = handler.onWriteError(err); err != nil { - buffer.Release() - return err - } + if handler.onData != nil { + handler.onData() + } + + if err := handler.writeTo(writer, buffer); err != nil { + buffer.Release() + return err } } } @@ -107,9 +121,9 @@ func copyInternal(reader Reader, writer Writer, handler copyHandler) error { // Copy dumps all payload from reader to writer or stops when an error occurs. // ActivityTimer gets updated as soon as there is a payload. func Copy(reader Reader, writer Writer, options ...CopyOption) error { - handler := copyHandler{} + handler := new(copyHandler) for _, option := range options { - option(&handler) + option(handler) } err := copyInternal(reader, writer, handler) if err != nil && errors.Cause(err) != io.EOF {