diff --git a/common/buf/io.go b/common/buf/io.go index bed9d84f0..0b801449e 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -40,13 +40,7 @@ func ReadFullFrom(reader io.Reader, size int32) Supplier { } } -// ReadAtLeastFrom create a Supplier to read at least size bytes from the given io.Reader. -func ReadAtLeastFrom(reader io.Reader, size int) Supplier { - return func(b []byte) (int, error) { - return io.ReadAtLeast(reader, b, size) - } -} - +// WriteAllBytes ensures all bytes are written into the given writer. func WriteAllBytes(writer io.Writer, payload []byte) error { for len(payload) > 0 { n, err := writer.Write(payload) @@ -89,10 +83,3 @@ func NewWriter(writer io.Writer) Writer { Writer: writer, } } - -// NewSequentialWriter returns a Writer that write Buffers in a MultiBuffer sequentially. -func NewSequentialWriter(writer io.Writer) Writer { - return &SequentialWriter{ - Writer: writer, - } -} diff --git a/common/buf/writer.go b/common/buf/writer.go index 04ca3da42..cb48691bc 100644 --- a/common/buf/writer.go +++ b/common/buf/writer.go @@ -171,10 +171,12 @@ func (w *BufferedWriter) Close() error { return common.Close(w.writer) } +// SequentialWriter is a Writer that writes MultiBuffer sequentially into the underlying io.Writer. type SequentialWriter struct { io.Writer } +// WriteMultiBuffer implements Writer. func (w *SequentialWriter) WriteMultiBuffer(mb MultiBuffer) error { defer mb.Release() diff --git a/common/buf/writer_test.go b/common/buf/writer_test.go index 09b01559f..270aabfb8 100644 --- a/common/buf/writer_test.go +++ b/common/buf/writer_test.go @@ -73,14 +73,21 @@ func TestDiscardBytesMultiBuffer(t *testing.T) { } func TestWriterInterface(t *testing.T) { - assert := With(t) + { + var writer interface{} = (*BufferToBytesWriter)(nil) + switch writer.(type) { + case Writer, io.Writer, io.ReaderFrom: + default: + t.Error("BufferToBytesWriter is not Writer, io.Writer or io.ReaderFrom") + } + } - assert((*BufferToBytesWriter)(nil), Implements, (*Writer)(nil)) - assert((*BufferToBytesWriter)(nil), Implements, (*io.Writer)(nil)) - assert((*BufferToBytesWriter)(nil), Implements, (*io.ReaderFrom)(nil)) - - assert((*BufferedWriter)(nil), Implements, (*Writer)(nil)) - assert((*BufferedWriter)(nil), Implements, (*io.Writer)(nil)) - assert((*BufferedWriter)(nil), Implements, (*io.ReaderFrom)(nil)) - assert((*BufferedWriter)(nil), Implements, (*io.ByteWriter)(nil)) + { + var writer interface{} = (*BufferedWriter)(nil) + switch writer.(type) { + case Writer, io.Writer, io.ReaderFrom, io.ByteWriter: + default: + t.Error("BufferedWriter is not Writer, io.Writer, io.ReaderFrom or io.ByteWriter") + } + } } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index fa4fbfe90..bd71ef0e4 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -110,14 +110,14 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in } else { //if we are in TPROXY mode, use linux's udp forging functionality if !d.config.FollowRedirect { - writer = buf.NewSequentialWriter(conn) + writer = &buf.SequentialWriter{Writer: conn} } else { srca := net.UDPAddr{IP: dest.Address.IP(), Port: int(dest.Port.Value())} origsend, err := udp.TransmitSocket(&srca, conn.RemoteAddr()) if err != nil { return err } - writer = buf.NewSequentialWriter(origsend) + writer = &buf.SequentialWriter{Writer: origsend} } } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index fe69381d2..4206d3120 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -128,7 +128,7 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia if destination.Network == net.Network_TCP { writer = buf.NewWriter(conn) } else { - writer = buf.NewSequentialWriter(conn) + writer = &buf.SequentialWriter{Writer: conn} } if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to process request").Base(err) diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 8b1478a28..7c0232d40 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -133,10 +133,10 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial if request.Command == protocol.RequestCommandUDP { - writer := buf.NewSequentialWriter(&UDPWriter{ + writer := &buf.SequentialWriter{Writer: &UDPWriter{ Writer: conn, Request: request, - }) + }} requestDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) diff --git a/proxy/shadowsocks/protocol_test.go b/proxy/shadowsocks/protocol_test.go index 2a1fd855c..673444dd8 100644 --- a/proxy/shadowsocks/protocol_test.go +++ b/proxy/shadowsocks/protocol_test.go @@ -142,7 +142,7 @@ func TestUDPReaderWriter(t *testing.T) { }), } cache := buf.New() - writer := buf.NewSequentialWriter(&UDPWriter{ + writer := &buf.SequentialWriter{Writer: &UDPWriter{ Writer: cache, Request: &protocol.RequestHeader{ Version: Version, @@ -151,7 +151,7 @@ func TestUDPReaderWriter(t *testing.T) { User: user, Option: RequestOptionOneTimeAuth, }, - }) + }} reader := &UDPReader{ Reader: cache, diff --git a/proxy/socks/client.go b/proxy/socks/client.go index 8765b94e7..e837b53c2 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -123,7 +123,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial defer udpConn.Close() // nolint: errcheck requestFunc = func() error { defer timer.SetTimeout(p.Timeouts.DownlinkOnly) - return buf.Copy(link.Reader, buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer)) + return buf.Copy(link.Reader, &buf.SequentialWriter{Writer: NewUDPWriter(request, udpConn)}, buf.UpdateActivity(timer)) } responseFunc = func() error { defer timer.SetTimeout(p.Timeouts.UplinkOnly) diff --git a/proxy/socks/protocol_test.go b/proxy/socks/protocol_test.go index 325f3c7bf..73f517f75 100644 --- a/proxy/socks/protocol_test.go +++ b/proxy/socks/protocol_test.go @@ -20,7 +20,7 @@ func TestUDPEncoding(t *testing.T) { Address: net.IPAddress([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6}), Port: 1024, } - writer := buf.NewSequentialWriter(NewUDPWriter(request, b)) + writer := &buf.SequentialWriter{Writer: NewUDPWriter(request, b)} content := []byte{'a'} payload := buf.New()