1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-11-10 14:26:26 -05:00

cleanup buf interfaces

This commit is contained in:
Darien Raymond 2018-07-31 13:43:27 +02:00
parent b3cf1f70d7
commit 7baa6977d3
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
9 changed files with 28 additions and 32 deletions

View File

@ -40,13 +40,7 @@ func ReadFullFrom(reader io.Reader, size int32) Supplier {
} }
} }
// ReadAtLeastFrom create a Supplier to read at least size bytes from the given io.Reader. // WriteAllBytes ensures all bytes are written into the given writer.
func ReadAtLeastFrom(reader io.Reader, size int) Supplier {
return func(b []byte) (int, error) {
return io.ReadAtLeast(reader, b, size)
}
}
func WriteAllBytes(writer io.Writer, payload []byte) error { func WriteAllBytes(writer io.Writer, payload []byte) error {
for len(payload) > 0 { for len(payload) > 0 {
n, err := writer.Write(payload) n, err := writer.Write(payload)
@ -89,10 +83,3 @@ func NewWriter(writer io.Writer) Writer {
Writer: writer, Writer: writer,
} }
} }
// NewSequentialWriter returns a Writer that write Buffers in a MultiBuffer sequentially.
func NewSequentialWriter(writer io.Writer) Writer {
return &SequentialWriter{
Writer: writer,
}
}

View File

@ -171,10 +171,12 @@ func (w *BufferedWriter) Close() error {
return common.Close(w.writer) return common.Close(w.writer)
} }
// SequentialWriter is a Writer that writes MultiBuffer sequentially into the underlying io.Writer.
type SequentialWriter struct { type SequentialWriter struct {
io.Writer io.Writer
} }
// WriteMultiBuffer implements Writer.
func (w *SequentialWriter) WriteMultiBuffer(mb MultiBuffer) error { func (w *SequentialWriter) WriteMultiBuffer(mb MultiBuffer) error {
defer mb.Release() defer mb.Release()

View File

@ -73,14 +73,21 @@ func TestDiscardBytesMultiBuffer(t *testing.T) {
} }
func TestWriterInterface(t *testing.T) { func TestWriterInterface(t *testing.T) {
assert := With(t) {
var writer interface{} = (*BufferToBytesWriter)(nil)
switch writer.(type) {
case Writer, io.Writer, io.ReaderFrom:
default:
t.Error("BufferToBytesWriter is not Writer, io.Writer or io.ReaderFrom")
}
}
assert((*BufferToBytesWriter)(nil), Implements, (*Writer)(nil)) {
assert((*BufferToBytesWriter)(nil), Implements, (*io.Writer)(nil)) var writer interface{} = (*BufferedWriter)(nil)
assert((*BufferToBytesWriter)(nil), Implements, (*io.ReaderFrom)(nil)) switch writer.(type) {
case Writer, io.Writer, io.ReaderFrom, io.ByteWriter:
assert((*BufferedWriter)(nil), Implements, (*Writer)(nil)) default:
assert((*BufferedWriter)(nil), Implements, (*io.Writer)(nil)) t.Error("BufferedWriter is not Writer, io.Writer, io.ReaderFrom or io.ByteWriter")
assert((*BufferedWriter)(nil), Implements, (*io.ReaderFrom)(nil)) }
assert((*BufferedWriter)(nil), Implements, (*io.ByteWriter)(nil)) }
} }

View File

@ -110,14 +110,14 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
} else { } else {
//if we are in TPROXY mode, use linux's udp forging functionality //if we are in TPROXY mode, use linux's udp forging functionality
if !d.config.FollowRedirect { if !d.config.FollowRedirect {
writer = buf.NewSequentialWriter(conn) writer = &buf.SequentialWriter{Writer: conn}
} else { } else {
srca := net.UDPAddr{IP: dest.Address.IP(), Port: int(dest.Port.Value())} srca := net.UDPAddr{IP: dest.Address.IP(), Port: int(dest.Port.Value())}
origsend, err := udp.TransmitSocket(&srca, conn.RemoteAddr()) origsend, err := udp.TransmitSocket(&srca, conn.RemoteAddr())
if err != nil { if err != nil {
return err return err
} }
writer = buf.NewSequentialWriter(origsend) writer = &buf.SequentialWriter{Writer: origsend}
} }
} }

View File

@ -128,7 +128,7 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
if destination.Network == net.Network_TCP { if destination.Network == net.Network_TCP {
writer = buf.NewWriter(conn) writer = buf.NewWriter(conn)
} else { } else {
writer = buf.NewSequentialWriter(conn) writer = &buf.SequentialWriter{Writer: conn}
} }
if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil { if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil {
return newError("failed to process request").Base(err) return newError("failed to process request").Base(err)

View File

@ -133,10 +133,10 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
if request.Command == protocol.RequestCommandUDP { if request.Command == protocol.RequestCommandUDP {
writer := buf.NewSequentialWriter(&UDPWriter{ writer := &buf.SequentialWriter{Writer: &UDPWriter{
Writer: conn, Writer: conn,
Request: request, Request: request,
}) }}
requestDone := func() error { requestDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)

View File

@ -142,7 +142,7 @@ func TestUDPReaderWriter(t *testing.T) {
}), }),
} }
cache := buf.New() cache := buf.New()
writer := buf.NewSequentialWriter(&UDPWriter{ writer := &buf.SequentialWriter{Writer: &UDPWriter{
Writer: cache, Writer: cache,
Request: &protocol.RequestHeader{ Request: &protocol.RequestHeader{
Version: Version, Version: Version,
@ -151,7 +151,7 @@ func TestUDPReaderWriter(t *testing.T) {
User: user, User: user,
Option: RequestOptionOneTimeAuth, Option: RequestOptionOneTimeAuth,
}, },
}) }}
reader := &UDPReader{ reader := &UDPReader{
Reader: cache, Reader: cache,

View File

@ -123,7 +123,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
defer udpConn.Close() // nolint: errcheck defer udpConn.Close() // nolint: errcheck
requestFunc = func() error { requestFunc = func() error {
defer timer.SetTimeout(p.Timeouts.DownlinkOnly) defer timer.SetTimeout(p.Timeouts.DownlinkOnly)
return buf.Copy(link.Reader, buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer)) return buf.Copy(link.Reader, &buf.SequentialWriter{Writer: NewUDPWriter(request, udpConn)}, buf.UpdateActivity(timer))
} }
responseFunc = func() error { responseFunc = func() error {
defer timer.SetTimeout(p.Timeouts.UplinkOnly) defer timer.SetTimeout(p.Timeouts.UplinkOnly)

View File

@ -20,7 +20,7 @@ func TestUDPEncoding(t *testing.T) {
Address: net.IPAddress([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6}), Address: net.IPAddress([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6}),
Port: 1024, Port: 1024,
} }
writer := buf.NewSequentialWriter(NewUDPWriter(request, b)) writer := &buf.SequentialWriter{Writer: NewUDPWriter(request, b)}
content := []byte{'a'} content := []byte{'a'}
payload := buf.New() payload := buf.New()