From 008c285324fc73c784886d2abe1e5dc820616423 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Fri, 29 Jan 2016 13:39:55 +0000 Subject: [PATCH] move transport methods from net to io --- common/alloc/buffer.go | 7 ++ common/crypto/authenticator.go | 2 +- common/io/reader.go | 122 ++++++++++++++++++++ common/io/transport.go | 42 +++++++ common/io/transport_test.go | 37 ++++++ common/net/transport.go | 96 ---------------- common/net/transport_test.go | 153 ------------------------- proxy/blackhole/blackhole.go | 3 +- proxy/dokodemo/dokodemo.go | 5 +- proxy/freedom/freedom.go | 7 +- proxy/freedom/freedom_test.go | 3 +- proxy/http/http.go | 5 +- proxy/shadowsocks/shadowsocks.go | 7 +- proxy/socks/socks.go | 9 +- proxy/testing/mocks/inboundhandler.go | 5 +- proxy/testing/mocks/outboundhandler.go | 5 +- proxy/vmess/inbound/inbound.go | 5 +- proxy/vmess/outbound/outbound.go | 7 +- proxy/vmess/protocol/vmess_test.go | 2 +- testing/servers/tcp/tcp.go | 3 +- transport/hub/udp.go | 2 +- 21 files changed, 249 insertions(+), 278 deletions(-) create mode 100644 common/io/reader.go create mode 100644 common/io/transport.go create mode 100644 common/io/transport_test.go delete mode 100644 common/net/transport.go delete mode 100644 common/net/transport_test.go diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index 18d2c8c6d..929d2a624 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -11,6 +11,13 @@ func Release(buffer *Buffer) { } } +func Len(buffer *Buffer) int { + if buffer == nil { + return 0 + } + return buffer.Len() +} + // Buffer is a recyclable allocation of a byte array. Buffer.Release() recycles // the buffer into an internal buffer pool, in order to recreate a buffer more // quickly. diff --git a/common/crypto/authenticator.go b/common/crypto/authenticator.go index 51072e03a..4cb7f8952 100644 --- a/common/crypto/authenticator.go +++ b/common/crypto/authenticator.go @@ -1,6 +1,6 @@ package crypto type Authenticator interface { - AuthBytes() int + AuthSize() int Authenticate(auth []byte, data []byte) []byte } diff --git a/common/io/reader.go b/common/io/reader.go new file mode 100644 index 000000000..33bc7aa09 --- /dev/null +++ b/common/io/reader.go @@ -0,0 +1,122 @@ +package io // import "github.com/v2ray/v2ray-core/common/io" + +import ( + "io" + + "github.com/v2ray/v2ray-core/common/alloc" + "github.com/v2ray/v2ray-core/common/crypto" + "github.com/v2ray/v2ray-core/common/serial" + "github.com/v2ray/v2ray-core/transport" +) + +// ReadFrom reads from a reader and put all content to a buffer. +// If buffer is nil, ReadFrom creates a new normal buffer. +func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) { + if buffer == nil { + buffer = alloc.NewBuffer() + } + nBytes, err := reader.Read(buffer.Value) + buffer.Slice(0, nBytes) + return buffer, err +} + +type Reader interface { + Read() (*alloc.Buffer, error) +} + +type AdaptiveReader struct { + reader io.Reader + allocate func() *alloc.Buffer + isLarge bool +} + +func NewAdaptiveReader(reader io.Reader) *AdaptiveReader { + return &AdaptiveReader{ + reader: reader, + allocate: alloc.NewBuffer, + isLarge: false, + } +} + +func (this *AdaptiveReader) Read() (*alloc.Buffer, error) { + buffer, err := ReadFrom(this.reader, this.allocate()) + + if buffer.IsFull() && !this.isLarge { + this.allocate = alloc.NewLargeBuffer + this.isLarge = true + } else if !buffer.IsFull() { + this.allocate = alloc.NewBuffer + this.isLarge = false + } + + if err != nil { + alloc.Release(buffer) + return nil, err + } + return buffer, nil +} + +type ChunkReader struct { + reader io.Reader +} + +func NewChunkReader(reader io.Reader) *ChunkReader { + return &ChunkReader{ + reader: reader, + } +} + +func (this *ChunkReader) Read() (*alloc.Buffer, error) { + buffer := alloc.NewLargeBuffer() + if _, err := io.ReadFull(this.reader, buffer.Value[:2]); err != nil { + alloc.Release(buffer) + return nil, err + } + length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value() + if _, err := io.ReadFull(this.reader, buffer.Value[:length]); err != nil { + alloc.Release(buffer) + return nil, err + } + buffer.Slice(0, int(length)) + return buffer, nil +} + +type AuthenticationReader struct { + reader Reader + authenticator crypto.Authenticator + authBeforePayload bool +} + +func NewAuthenticationReader(reader io.Reader, auth crypto.Authenticator, authBeforePayload bool) *AuthenticationReader { + return &AuthenticationReader{ + reader: NewChunkReader(reader), + authenticator: auth, + authBeforePayload: authBeforePayload, + } +} + +func (this *AuthenticationReader) Read() (*alloc.Buffer, error) { + buffer, err := this.reader.Read() + if err != nil { + alloc.Release(buffer) + return nil, err + } + + authSize := this.authenticator.AuthSize() + var authBytes, payloadBytes []byte + if this.authBeforePayload { + authBytes = buffer.Value[:authSize] + payloadBytes = buffer.Value[authSize:] + } else { + payloadBytes = buffer.Value[:authSize] + authBytes = buffer.Value[authSize:] + } + + actualAuthBytes := this.authenticator.Authenticate(nil, payloadBytes) + if !serial.BytesLiteral(authBytes).Equals(serial.BytesLiteral(actualAuthBytes)) { + alloc.Release(buffer) + return nil, transport.CorruptedPacket + } + buffer.Value = payloadBytes + return buffer, nil +} diff --git a/common/io/transport.go b/common/io/transport.go new file mode 100644 index 000000000..4908abe90 --- /dev/null +++ b/common/io/transport.go @@ -0,0 +1,42 @@ +package io + +import ( + "io" + + "github.com/v2ray/v2ray-core/common/alloc" +) + +func RawReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error { + return ReaderToChan(stream, NewAdaptiveReader(reader)) +} + +// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF. +func ReaderToChan(stream chan<- *alloc.Buffer, reader Reader) error { + for { + buffer, err := reader.Read() + if alloc.Len(buffer) > 0 { + stream <- buffer + } else { + alloc.Release(buffer) + } + + if err != nil { + return err + } + } +} + +// ChanToWriter dumps all content from a given chan to a writer until the chan is closed. +func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error { + for buffer := range stream { + nBytes, err := writer.Write(buffer.Value) + if nBytes < buffer.Len() { + _, err = writer.Write(buffer.Value[nBytes:]) + } + buffer.Release() + if err != nil { + return err + } + } + return nil +} diff --git a/common/io/transport_test.go b/common/io/transport_test.go new file mode 100644 index 000000000..3670c56ce --- /dev/null +++ b/common/io/transport_test.go @@ -0,0 +1,37 @@ +package io_test + +import ( + "bytes" + "crypto/rand" + "io" + "testing" + + "github.com/v2ray/v2ray-core/common/alloc" + . "github.com/v2ray/v2ray-core/common/io" + v2testing "github.com/v2ray/v2ray-core/testing" + "github.com/v2ray/v2ray-core/testing/assert" +) + +func TestReaderAndWrite(t *testing.T) { + v2testing.Current(t) + + size := 1024 * 1024 + buffer := make([]byte, size) + nBytes, err := rand.Read(buffer) + assert.Int(nBytes).Equals(len(buffer)) + assert.Error(err).IsNil() + + readerBuffer := bytes.NewReader(buffer) + writerBuffer := bytes.NewBuffer(make([]byte, 0, size)) + + transportChan := make(chan *alloc.Buffer, 1024) + + err = ReaderToChan(transportChan, NewAdaptiveReader(readerBuffer)) + assert.Error(err).Equals(io.EOF) + close(transportChan) + + err = ChanToWriter(writerBuffer, transportChan) + assert.Error(err).IsNil() + + assert.Bytes(buffer).Equals(writerBuffer.Bytes()) +} diff --git a/common/net/transport.go b/common/net/transport.go deleted file mode 100644 index a527fbb58..000000000 --- a/common/net/transport.go +++ /dev/null @@ -1,96 +0,0 @@ -package net - -import ( - "io" - - "github.com/v2ray/v2ray-core/common/alloc" - "github.com/v2ray/v2ray-core/common/crypto" - "github.com/v2ray/v2ray-core/common/serial" - "github.com/v2ray/v2ray-core/transport" -) - -// ReadFrom reads from a reader and put all content to a buffer. -// If buffer is nil, ReadFrom creates a new normal buffer. -func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) { - if buffer == nil { - buffer = alloc.NewBuffer() - } - nBytes, err := reader.Read(buffer.Value) - buffer.Slice(0, nBytes) - return buffer, err -} - -func ReadChunk(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) { - if buffer == nil { - buffer = alloc.NewBuffer() - } - if _, err := io.ReadFull(reader, buffer.Value[:2]); err != nil { - alloc.Release(buffer) - return nil, err - } - length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value() - if _, err := io.ReadFull(reader, buffer.Value[:length]); err != nil { - alloc.Release(buffer) - return nil, err - } - buffer.Slice(0, int(length)) - return buffer, nil -} - -func ReadAuthenticatedChunk(reader io.Reader, auth crypto.Authenticator, buffer *alloc.Buffer) (*alloc.Buffer, error) { - buffer, err := ReadChunk(reader, buffer) - if err != nil { - alloc.Release(buffer) - return nil, err - } - authSize := auth.AuthBytes() - - authBytes := auth.Authenticate(nil, buffer.Value[authSize:]) - - if !serial.BytesLiteral(authBytes).Equals(serial.BytesLiteral(buffer.Value[:authSize])) { - alloc.Release(buffer) - return nil, transport.CorruptedPacket - } - buffer.SliceFrom(authSize) - - return buffer, nil -} - -// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF. -func ReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error { - allocate := alloc.NewBuffer - large := false - for { - buffer, err := ReadFrom(reader, allocate()) - if buffer.Len() > 0 { - stream <- buffer - } else { - buffer.Release() - } - if err != nil { - return err - } - if buffer.IsFull() && !large { - allocate = alloc.NewLargeBuffer - large = true - } else if !buffer.IsFull() { - allocate = alloc.NewBuffer - large = false - } - } -} - -// ChanToWriter dumps all content from a given chan to a writer until the chan is closed. -func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error { - for buffer := range stream { - nBytes, err := writer.Write(buffer.Value) - if nBytes < buffer.Len() { - _, err = writer.Write(buffer.Value[nBytes:]) - } - buffer.Release() - if err != nil { - return err - } - } - return nil -} diff --git a/common/net/transport_test.go b/common/net/transport_test.go deleted file mode 100644 index 4df1db023..000000000 --- a/common/net/transport_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package net_test - -import ( - "bytes" - "crypto/rand" - "io" - "io/ioutil" - "testing" - - "github.com/v2ray/v2ray-core/common/alloc" - v2net "github.com/v2ray/v2ray-core/common/net" - v2testing "github.com/v2ray/v2ray-core/testing" - "github.com/v2ray/v2ray-core/testing/assert" -) - -func TestReaderAndWrite(t *testing.T) { - v2testing.Current(t) - - size := 1024 * 1024 - buffer := make([]byte, size) - nBytes, err := rand.Read(buffer) - assert.Int(nBytes).Equals(len(buffer)) - assert.Error(err).IsNil() - - readerBuffer := bytes.NewReader(buffer) - writerBuffer := bytes.NewBuffer(make([]byte, 0, size)) - - transportChan := make(chan *alloc.Buffer, 1024) - - err = v2net.ReaderToChan(transportChan, readerBuffer) - assert.Error(err).Equals(io.EOF) - close(transportChan) - - err = v2net.ChanToWriter(writerBuffer, transportChan) - assert.Error(err).IsNil() - - assert.Bytes(buffer).Equals(writerBuffer.Bytes()) -} - -type StaticReader struct { - total int - current int -} - -func (reader *StaticReader) Read(b []byte) (size int, err error) { - size = len(b) - if size > reader.total-reader.current { - size = reader.total - reader.current - } - for i := 0; i < size; i++ { - b[i] = byte(i) - } - //rand.Read(b[:size]) - reader.current += size - if reader.current == reader.total { - err = io.EOF - } - return -} - -func BenchmarkTransport1K(b *testing.B) { - size := 1 * 1024 - - for i := 0; i < b.N; i++ { - runBenchmarkTransport(size) - } -} - -func BenchmarkTransport2K(b *testing.B) { - size := 2 * 1024 - - for i := 0; i < b.N; i++ { - runBenchmarkTransport(size) - } -} - -func BenchmarkTransport4K(b *testing.B) { - size := 4 * 1024 - - for i := 0; i < b.N; i++ { - runBenchmarkTransport(size) - } -} - -func BenchmarkTransport10K(b *testing.B) { - size := 10 * 1024 - - for i := 0; i < b.N; i++ { - runBenchmarkTransport(size) - } -} - -func BenchmarkTransport100K(b *testing.B) { - size := 100 * 1024 - - for i := 0; i < b.N; i++ { - runBenchmarkTransport(size) - } -} - -func BenchmarkTransport1M(b *testing.B) { - size := 1024 * 1024 - - for i := 0; i < b.N; i++ { - runBenchmarkTransport(size) - } -} - -func BenchmarkTransport10M(b *testing.B) { - size := 10 * 1024 * 1024 - - for i := 0; i < b.N; i++ { - runBenchmarkTransport(size) - } -} - -func runBenchmarkTransport(size int) { - - transportChanA := make(chan *alloc.Buffer, 16) - transportChanB := make(chan *alloc.Buffer, 16) - - readerA := &StaticReader{size, 0} - readerB := &StaticReader{size, 0} - - writerA := ioutil.Discard - writerB := ioutil.Discard - - finishA := make(chan bool) - finishB := make(chan bool) - - go func() { - v2net.ChanToWriter(writerA, transportChanA) - close(finishA) - }() - - go func() { - v2net.ReaderToChan(transportChanA, readerA) - close(transportChanA) - }() - - go func() { - v2net.ChanToWriter(writerB, transportChanB) - close(finishB) - }() - - go func() { - v2net.ReaderToChan(transportChanB, readerB) - close(transportChanB) - }() - - <-transportChanA - <-transportChanB -} diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index 35a6097c9..daa1dfe06 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "github.com/v2ray/v2ray-core/app" + v2io "github.com/v2ray/v2ray-core/common/io" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" @@ -25,7 +26,7 @@ func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) e close(ray.OutboundOutput()) if firstPacket.MoreChunks() { - v2net.ChanToWriter(ioutil.Discard, ray.OutboundInput()) + v2io.ChanToWriter(ioutil.Discard, ray.OutboundInput()) } return nil } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 0b9be5e1d..61da11736 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -6,6 +6,7 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" + v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" @@ -140,12 +141,12 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) { } func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) { - v2net.ReaderToChan(input, reader) + v2io.RawReaderToChan(input, reader) finish.Unlock() close(input) } func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) { - v2net.ChanToWriter(writer, output) + v2io.ChanToWriter(writer, output) finish.Unlock() } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index c3e11f19f..e37cba31e 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/v2ray/v2ray-core/app" + v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/common/retry" @@ -50,7 +51,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou writeMutex.Unlock() } else { go func() { - v2net.ChanToWriter(conn, input) + v2io.ChanToWriter(conn, input) writeMutex.Unlock() }() } @@ -59,7 +60,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou defer readMutex.Unlock() defer close(output) - response, err := v2net.ReadFrom(conn, nil) + response, err := v2io.ReadFrom(conn, nil) log.Info("Freedom receives ", response.Len(), " bytes from ", conn.RemoteAddr()) if response.Len() > 0 { output <- response @@ -73,7 +74,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou return } - v2net.ReaderToChan(output, conn) + v2io.RawReaderToChan(output, conn) }() if this.space.HasDnsCache() { diff --git a/proxy/freedom/freedom_test.go b/proxy/freedom/freedom_test.go index 2d227019c..bea4feb4d 100644 --- a/proxy/freedom/freedom_test.go +++ b/proxy/freedom/freedom_test.go @@ -10,6 +10,7 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" + v2io "github.com/v2ray/v2ray-core/common/io" v2net "github.com/v2ray/v2ray-core/common/net" v2nettesting "github.com/v2ray/v2ray-core/common/net/testing" v2proxy "github.com/v2ray/v2ray-core/proxy" @@ -128,7 +129,7 @@ func TestSocksTcpConnect(t *testing.T) { tcpConn.CloseWrite() } - dataReturned, err := v2net.ReadFrom(conn, nil) + dataReturned, err := v2io.ReadFrom(conn, nil) assert.Error(err).IsNil() conn.Close() diff --git a/proxy/http/http.go b/proxy/http/http.go index f9cb1bfe6..e5f4b5ca9 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -11,6 +11,7 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" + v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/common/serial" @@ -153,13 +154,13 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra defer wg.Wait() go func() { - v2net.ReaderToChan(ray.InboundInput(), input) + v2io.RawReaderToChan(ray.InboundInput(), input) close(ray.InboundInput()) wg.Done() }() go func() { - v2net.ChanToWriter(output, ray.InboundOutput()) + v2io.ChanToWriter(output, ray.InboundOutput()) wg.Done() }() } diff --git a/proxy/shadowsocks/shadowsocks.go b/proxy/shadowsocks/shadowsocks.go index 4c0332120..e6f207c5b 100644 --- a/proxy/shadowsocks/shadowsocks.go +++ b/proxy/shadowsocks/shadowsocks.go @@ -9,6 +9,7 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" + v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" @@ -84,7 +85,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, dest v2net.Des return } - buffer, _ := v2net.ReadFrom(reader, nil) + buffer, _ := v2io.ReadFrom(reader, nil) packet := v2net.NewPacket(v2net.TCPDestination(request.Address, request.Port), buffer, false) ray := this.space.PacketDispatcher().DispatchToOutbound(packet) @@ -168,12 +169,12 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { payload.Release() writer.Write(firstChunk.Value) - v2net.ChanToWriter(writer, ray.InboundOutput()) + v2io.ChanToWriter(writer, ray.InboundOutput()) } writeFinish.Unlock() }() - v2net.ReaderToChan(ray.InboundInput(), reader) + v2io.RawReaderToChan(ray.InboundInput(), reader) close(ray.InboundInput()) writeFinish.Lock() diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index 620667e33..ef9588de2 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -9,6 +9,7 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" + v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" @@ -227,8 +228,8 @@ func (this *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writer return err } - reader.SetTimeOut(300) /* 5 minutes */ - v2net.ReadFrom(reader, nil) // Just in case of anything left in the socket + reader.SetTimeOut(300) /* 5 minutes */ + v2io.ReadFrom(reader, nil) // Just in case of anything left in the socket // The TCP connection closes after this method returns. We need to wait until // the client closes it. // TODO: get notified from UDP part @@ -270,13 +271,13 @@ func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPack outputFinish.Lock() go func() { - v2net.ReaderToChan(input, reader) + v2io.RawReaderToChan(input, reader) inputFinish.Unlock() close(input) }() go func() { - v2net.ChanToWriter(writer, output) + v2io.ChanToWriter(writer, output) outputFinish.Unlock() }() outputFinish.Lock() diff --git a/proxy/testing/mocks/inboundhandler.go b/proxy/testing/mocks/inboundhandler.go index f9416215a..e7dd10a6c 100644 --- a/proxy/testing/mocks/inboundhandler.go +++ b/proxy/testing/mocks/inboundhandler.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/v2ray/v2ray-core/app" + v2io "github.com/v2ray/v2ray-core/common/io" v2net "github.com/v2ray/v2ray-core/common/net" ) @@ -41,13 +42,13 @@ func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error { writeFinish.Lock() go func() { - v2net.ReaderToChan(input, this.ConnInput) + v2io.RawReaderToChan(input, this.ConnInput) close(input) readFinish.Unlock() }() go func() { - v2net.ChanToWriter(this.ConnOutput, output) + v2io.ChanToWriter(this.ConnOutput, output) writeFinish.Unlock() }() diff --git a/proxy/testing/mocks/outboundhandler.go b/proxy/testing/mocks/outboundhandler.go index 30613ce51..c454f1613 100644 --- a/proxy/testing/mocks/outboundhandler.go +++ b/proxy/testing/mocks/outboundhandler.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/v2ray/v2ray-core/app" + v2io "github.com/v2ray/v2ray-core/common/io" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/transport/ray" @@ -32,14 +33,14 @@ func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.Out writeFinish.Lock() go func() { - v2net.ChanToWriter(this.ConnOutput, input) + v2io.ChanToWriter(this.ConnOutput, input) writeFinish.Unlock() }() writeFinish.Lock() } - v2net.ReaderToChan(output, this.ConnInput) + v2io.RawReaderToChan(output, this.ConnInput) close(output) return nil diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 43ca5787e..e8fdb092b 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -8,6 +8,7 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" v2crypto "github.com/v2ray/v2ray-core/common/crypto" + v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/common/serial" @@ -136,11 +137,11 @@ func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- return } requestReader := v2crypto.NewCryptionReader(aesStream, reader) - v2net.ReaderToChan(input, requestReader) + v2io.RawReaderToChan(input, requestReader) } func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) { - v2net.ChanToWriter(writer, output) + v2io.ChanToWriter(writer, output) finish.Unlock() } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index da96bfd83..744e2b3b4 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -11,6 +11,7 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" v2crypto "github.com/v2ray/v2ray-core/common/crypto" + v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" @@ -132,7 +133,7 @@ func (this *VMessOutboundHandler) handleRequest(conn net.Conn, request *protocol } if moreChunks { - v2net.ChanToWriter(encryptRequestWriter, input) + v2io.ChanToWriter(encryptRequestWriter, input) } return } @@ -154,7 +155,7 @@ func (this *VMessOutboundHandler) handleResponse(conn net.Conn, request *protoco } decryptResponseReader := v2crypto.NewCryptionReader(aesStream, conn) - buffer, err := v2net.ReadFrom(decryptResponseReader, nil) + buffer, err := v2io.ReadFrom(decryptResponseReader, nil) if err != nil { log.Error("VMessOut: Failed to read VMess response (", buffer.Len(), " bytes): ", err) buffer.Release() @@ -184,7 +185,7 @@ func (this *VMessOutboundHandler) handleResponse(conn net.Conn, request *protoco output <- buffer if !isUDP { - v2net.ReaderToChan(output, decryptResponseReader) + v2io.RawReaderToChan(output, decryptResponseReader) } return diff --git a/proxy/vmess/protocol/vmess_test.go b/proxy/vmess/protocol/vmess_test.go index c61b73342..b8545d862 100644 --- a/proxy/vmess/protocol/vmess_test.go +++ b/proxy/vmess/protocol/vmess_test.go @@ -84,7 +84,7 @@ func TestReadSingleByte(t *testing.T) { reader := NewVMessRequestReader(nil) _, err := reader.Read(bytes.NewReader(make([]byte, 1))) - assert.Error(err).Equals(io.EOF) + assert.Error(err).Equals(io.ErrUnexpectedEOF) } func BenchmarkVMessRequestWriting(b *testing.B) { diff --git a/testing/servers/tcp/tcp.go b/testing/servers/tcp/tcp.go index ed2e36c9c..d4362d087 100644 --- a/testing/servers/tcp/tcp.go +++ b/testing/servers/tcp/tcp.go @@ -4,6 +4,7 @@ import ( "fmt" "net" + v2io "github.com/v2ray/v2ray-core/common/io" v2net "github.com/v2ray/v2ray-core/common/net" ) @@ -43,7 +44,7 @@ func (server *Server) acceptConnections(listener *net.TCPListener) { func (server *Server) handleConnection(conn net.Conn) { for true { - request, err := v2net.ReadFrom(conn, nil) + request, err := v2io.ReadFrom(conn, nil) if err != nil { break } diff --git a/transport/hub/udp.go b/transport/hub/udp.go index de519fec0..3b21a62e6 100644 --- a/transport/hub/udp.go +++ b/transport/hub/udp.go @@ -44,7 +44,7 @@ func (this *UDPHub) WriteTo(payload []byte, dest v2net.Destination) (int, error) } func (this *UDPHub) start() { - this.accepting = true + this.accepting = true for this.accepting { buffer := alloc.NewBuffer() nBytes, addr, err := this.conn.ReadFromUDP(buffer.Value)