From 07847576b5cd28bda2d173e04050e39be38c52fd Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Sun, 4 Jun 2017 21:32:01 +0200 Subject: [PATCH] refine copy handler --- common/buf/copy.go | 100 +++++++++++++++++++++++++++++++++++++++++++++ common/buf/io.go | 88 --------------------------------------- 2 files changed, 100 insertions(+), 88 deletions(-) create mode 100644 common/buf/copy.go diff --git a/common/buf/copy.go b/common/buf/copy.go new file mode 100644 index 000000000..4046bbc34 --- /dev/null +++ b/common/buf/copy.go @@ -0,0 +1,100 @@ +package buf + +import ( + "io" + + "v2ray.com/core/common/errors" + "v2ray.com/core/common/signal" +) + +type errorHandler func(error) error +type dataHandler func(MultiBuffer) + +type copyHandler struct { + onReadError []errorHandler + onData []dataHandler + onWriteError []errorHandler +} + +func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) { + mb, err := reader.Read() + if err != nil { + for _, handler := range h.onReadError { + err = handler(err) + } + } + return mb, err +} + +func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error { + err := writer.Write(mb) + if err != nil { + for _, handler := range h.onWriteError { + err = handler(err) + } + } + return err +} + +type CopyOption func(*copyHandler) + +func IgnoreReaderError() CopyOption { + return func(handler *copyHandler) { + handler.onReadError = append(handler.onReadError, func(err error) error { + return nil + }) + } +} + +func IgnoreWriterError() CopyOption { + return func(handler *copyHandler) { + handler.onWriteError = append(handler.onWriteError, func(err error) error { + return nil + }) + } +} + +func UpdateActivity(timer signal.ActivityTimer) CopyOption { + return func(handler *copyHandler) { + handler.onData = append(handler.onData, func(MultiBuffer) { + timer.Update() + }) + } +} + +func copyInternal(reader Reader, writer Writer, handler *copyHandler) error { + for { + buffer, err := handler.readFrom(reader) + if err != nil { + return err + } + + if buffer.IsEmpty() { + buffer.Release() + continue + } + + for _, handler := range handler.onData { + handler(buffer) + } + + if err := handler.writeTo(writer, buffer); err != nil { + buffer.Release() + return err + } + } +} + +// Copy dumps all payload from reader to writer or stops when an error occurs. +// ActivityTimer gets updated as soon as there is a payload. +func Copy(reader Reader, writer Writer, options ...CopyOption) error { + handler := new(copyHandler) + for _, option := range options { + option(handler) + } + err := copyInternal(reader, writer, handler) + if err != nil && errors.Cause(err) != io.EOF { + return err + } + return nil +} diff --git a/common/buf/io.go b/common/buf/io.go index e04cebae2..0fe9e4f9e 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -3,9 +3,6 @@ package buf import ( "io" "time" - - "v2ray.com/core/common/errors" - "v2ray.com/core/common/signal" ) // Reader extends io.Reader with alloc.Buffer. @@ -47,91 +44,6 @@ func ReadAtLeastFrom(reader io.Reader, size int) Supplier { } } -type copyHandler struct { - onReadError func(error) error - onData func() - onWriteError func(error) error -} - -func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) { - mb, err := reader.Read() - if err != nil && h.onReadError != nil { - err = h.onReadError(err) - } - return mb, err -} - -func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error { - err := writer.Write(mb) - if err != nil && h.onWriteError != nil { - err = h.onWriteError(err) - } - return err -} - -type CopyOption func(*copyHandler) - -func IgnoreReaderError() CopyOption { - return func(handler *copyHandler) { - handler.onReadError = func(err error) error { - return nil - } - } -} - -func IgnoreWriterError() CopyOption { - return func(handler *copyHandler) { - handler.onWriteError = func(err error) error { - return nil - } - } -} - -func UpdateActivity(timer signal.ActivityTimer) CopyOption { - return func(handler *copyHandler) { - handler.onData = func() { - timer.Update() - } - } -} - -func copyInternal(reader Reader, writer Writer, handler *copyHandler) error { - for { - buffer, err := handler.readFrom(reader) - if err != nil { - return err - } - - if buffer.IsEmpty() { - buffer.Release() - continue - } - - if handler.onData != nil { - handler.onData() - } - - if err := handler.writeTo(writer, buffer); err != nil { - buffer.Release() - return err - } - } -} - -// Copy dumps all payload from reader to writer or stops when an error occurs. -// ActivityTimer gets updated as soon as there is a payload. -func Copy(reader Reader, writer Writer, options ...CopyOption) error { - handler := new(copyHandler) - for _, option := range options { - option(handler) - } - err := copyInternal(reader, writer, handler) - if err != nil && errors.Cause(err) != io.EOF { - return err - } - return nil -} - // NewReader creates a new Reader. // The Reader instance doesn't take the ownership of reader. func NewReader(reader io.Reader) Reader {