From 7407c8d561191710dd01c3462352f3411dfa3c78 Mon Sep 17 00:00:00 2001 From: v2ray Date: Mon, 18 Apr 2016 18:44:10 +0200 Subject: [PATCH] use stream instead of raw chan --- app/dispatcher/testing/dispatcher.go | 20 +++--- common/io/transport.go | 33 +-------- common/io/transport_test.go | 37 ----------- common/io/writer.go | 1 + common/log/log.go | 4 +- common/log/log_test.go | 2 +- common/net/packet.go | 8 +++ proxy/blackhole/blackhole.go | 17 ++--- proxy/dokodemo/dokodemo.go | 25 +++---- proxy/dokodemo/dokodemo_test.go | 17 +++-- proxy/freedom/freedom.go | 11 +-- proxy/freedom/freedom_test.go | 12 +--- proxy/http/chan_reader.go | 9 +-- proxy/http/http.go | 9 +-- proxy/shadowsocks/shadowsocks.go | 9 +-- proxy/socks/socks.go | 7 +- proxy/testing/mocks/inboundhandler.go | 7 +- proxy/testing/mocks/outboundhandler.go | 7 +- proxy/vmess/inbound/inbound.go | 9 +-- proxy/vmess/outbound/outbound.go | 23 ++----- shell/point/point.go | 8 ++- tools/build/go_test.go | 8 +-- transport/hub/udp_server.go | 10 ++- transport/ray/direct.go | 92 +++++++++++++++++++++++--- transport/ray/ray.go | 20 ++++-- 25 files changed, 217 insertions(+), 188 deletions(-) delete mode 100644 common/io/transport_test.go diff --git a/app/dispatcher/testing/dispatcher.go b/app/dispatcher/testing/dispatcher.go index e81b88c14..532e93fb2 100644 --- a/app/dispatcher/testing/dispatcher.go +++ b/app/dispatcher/testing/dispatcher.go @@ -6,28 +6,32 @@ import ( ) type TestPacketDispatcher struct { - LastPacket chan v2net.Packet - Handler func(packet v2net.Packet, traffic ray.OutboundRay) + Destination chan v2net.Destination + Handler func(packet v2net.Packet, traffic ray.OutboundRay) } func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.OutboundRay)) *TestPacketDispatcher { if handler == nil { handler = func(packet v2net.Packet, traffic ray.OutboundRay) { - for payload := range traffic.OutboundInput() { - traffic.OutboundOutput() <- payload.Prepend([]byte("Processed: ")) + for { + payload, err := traffic.OutboundInput().Read() + if err != nil { + break + } + traffic.OutboundOutput().Write(payload.Prepend([]byte("Processed: "))) } - close(traffic.OutboundOutput()) + traffic.OutboundOutput().Close() } } return &TestPacketDispatcher{ - LastPacket: make(chan v2net.Packet, 16), - Handler: handler, + Destination: make(chan v2net.Destination), + Handler: handler, } } func (this *TestPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay { traffic := ray.NewRay() - this.LastPacket <- packet + this.Destination <- packet.Destination() go this.Handler(packet, traffic) return traffic diff --git a/common/io/transport.go b/common/io/transport.go index 87bbe2d4b..8ca35264c 100644 --- a/common/io/transport.go +++ b/common/io/transport.go @@ -1,43 +1,16 @@ package io -import ( - "io" - - "github.com/v2ray/v2ray-core/common/alloc" -) - -func RawReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error { - return ReaderToChan(stream, NewAdaptiveReader(reader)) -} - -// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF. -func ReaderToChan(stream chan<- *alloc.Buffer, reader Reader) error { +func Pipe(reader Reader, writer Writer) error { for { buffer, err := reader.Read() if buffer.Len() > 0 { - stream <- buffer + err = writer.Write(buffer) } else { buffer.Release() } if err != nil { - return err + return nil } } } - -func ChanToRawWriter(writer io.Writer, stream <-chan *alloc.Buffer) error { - return ChanToWriter(NewAdaptiveWriter(writer), stream) -} - -// ChanToWriter dumps all content from a given chan to a writer until the chan is closed. -func ChanToWriter(writer Writer, stream <-chan *alloc.Buffer) error { - for buffer := range stream { - err := writer.Write(buffer) - buffer.Release() - if err != nil { - return err - } - } - return nil -} diff --git a/common/io/transport_test.go b/common/io/transport_test.go deleted file mode 100644 index fef339d6e..000000000 --- a/common/io/transport_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package io_test - -import ( - "bytes" - "crypto/rand" - "io" - "testing" - - "github.com/v2ray/v2ray-core/common/alloc" - . "github.com/v2ray/v2ray-core/common/io" - v2testing "github.com/v2ray/v2ray-core/testing" - "github.com/v2ray/v2ray-core/testing/assert" -) - -func TestReaderAndWrite(t *testing.T) { - v2testing.Current(t) - - size := 1024 * 1024 - buffer := make([]byte, size) - nBytes, err := rand.Read(buffer) - assert.Int(nBytes).Equals(len(buffer)) - assert.Error(err).IsNil() - - readerBuffer := bytes.NewReader(buffer) - writerBuffer := bytes.NewBuffer(make([]byte, 0, size)) - - transportChan := make(chan *alloc.Buffer, 1024) - - err = ReaderToChan(transportChan, NewAdaptiveReader(readerBuffer)) - assert.Error(err).Equals(io.EOF) - close(transportChan) - - err = ChanToRawWriter(writerBuffer, transportChan) - assert.Error(err).IsNil() - - assert.Bytes(buffer).Equals(writerBuffer.Bytes()) -} diff --git a/common/io/writer.go b/common/io/writer.go index 1cefa25d0..8901bd52d 100644 --- a/common/io/writer.go +++ b/common/io/writer.go @@ -32,6 +32,7 @@ func (this *AdaptiveWriter) Write(buffer *alloc.Buffer) error { if nBytes < buffer.Len() { _, err = this.writer.Write(buffer.Value[nBytes:]) } + buffer.Release() return err } diff --git a/common/log/log.go b/common/log/log.go index e8de6834e..dac8501f7 100644 --- a/common/log/log.go +++ b/common/log/log.go @@ -41,8 +41,8 @@ var ( noOpLoggerInstance logWriter = &noOpLogWriter{} streamLoggerInstance logWriter = newStdOutLogWriter() - debugLogger = noOpLoggerInstance - infoLogger = noOpLoggerInstance + debugLogger = streamLoggerInstance + infoLogger = streamLoggerInstance warningLogger = streamLoggerInstance errorLogger = streamLoggerInstance ) diff --git a/common/log/log_test.go b/common/log/log_test.go index f41564bb9..2741024aa 100644 --- a/common/log/log_test.go +++ b/common/log/log_test.go @@ -5,8 +5,8 @@ import ( "log" "testing" + "github.com/v2ray/v2ray-core/common/platform" "github.com/v2ray/v2ray-core/common/serial" - "github.com/v2ray/v2ray-core/common/platform" v2testing "github.com/v2ray/v2ray-core/testing" "github.com/v2ray/v2ray-core/testing/assert" ) diff --git a/common/net/packet.go b/common/net/packet.go index 6a6f59d4c..721548243 100644 --- a/common/net/packet.go +++ b/common/net/packet.go @@ -1,11 +1,14 @@ package net import ( + "github.com/v2ray/v2ray-core/common" "github.com/v2ray/v2ray-core/common/alloc" ) // Packet is a network packet to be sent to destination. type Packet interface { + common.Releasable + Destination() Destination Chunk() *alloc.Buffer // First chunk of this commnunication MoreChunks() bool @@ -37,3 +40,8 @@ func (packet *packetImpl) Chunk() *alloc.Buffer { func (packet *packetImpl) MoreChunks() bool { return packet.moreData } + +func (packet *packetImpl) Release() { + packet.data.Release() + packet.data = nil +} diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index ecab34db0..48e371213 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -1,10 +1,7 @@ package blackhole import ( - "io/ioutil" - "github.com/v2ray/v2ray-core/app" - v2io "github.com/v2ray/v2ray-core/common/io" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" @@ -20,14 +17,14 @@ func NewBlackHole() *BlackHole { } func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { - if chunk := firstPacket.Chunk(); chunk != nil { - chunk.Release() - } + firstPacket.Release() + + ray.OutboundOutput().Close() + ray.OutboundOutput().Release() + + ray.OutboundInput().Close() + ray.OutboundInput().Release() - close(ray.OutboundOutput()) - if firstPacket.MoreChunks() { - v2io.ChanToRawWriter(ioutil.Discard, ray.OutboundInput()) - } return nil } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 83e4a342c..4b41d68c3 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -1,7 +1,6 @@ package dokodemo import ( - "io" "sync" "github.com/v2ray/v2ray-core/app/dispatcher" @@ -126,25 +125,23 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) { packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true) ray := this.packetDispatcher.DispatchToOutbound(packet) + defer ray.InboundOutput().Release() var inputFinish, outputFinish sync.Mutex inputFinish.Lock() outputFinish.Lock() reader := v2net.NewTimeOutReader(this.config.Timeout, conn) - go dumpInput(reader, ray.InboundInput(), &inputFinish) - go dumpOutput(conn, ray.InboundOutput(), &outputFinish) + go func() { + v2io.Pipe(v2io.NewAdaptiveReader(reader), ray.InboundInput()) + inputFinish.Unlock() + ray.InboundInput().Close() + }() + + go func() { + v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(conn)) + outputFinish.Unlock() + }() outputFinish.Lock() } - -func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) { - v2io.RawReaderToChan(input, reader) - finish.Unlock() - close(input) -} - -func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) { - v2io.ChanToRawWriter(writer, output) - finish.Unlock() -} diff --git a/proxy/dokodemo/dokodemo_test.go b/proxy/dokodemo/dokodemo_test.go index d62e55192..bbb6f448c 100644 --- a/proxy/dokodemo/dokodemo_test.go +++ b/proxy/dokodemo/dokodemo_test.go @@ -43,7 +43,7 @@ func TestDokodemoTCP(t *testing.T) { tcpClient.Write([]byte(data2Send)) tcpClient.CloseWrite() - lastPacket := <-testPacketDispatcher.LastPacket + destination := <-testPacketDispatcher.Destination response := make([]byte, 1024) nBytes, err := tcpClient.Read(response) @@ -51,9 +51,9 @@ func TestDokodemoTCP(t *testing.T) { tcpClient.Close() assert.StringLiteral("Processed: " + data2Send).Equals(string(response[:nBytes])) - assert.Bool(lastPacket.Destination().IsTCP()).IsTrue() - netassert.Address(lastPacket.Destination().Address()).Equals(v2net.IPAddress([]byte{1, 2, 3, 4})) - netassert.Port(lastPacket.Destination().Port()).Equals(128) + assert.Bool(destination.IsTCP()).IsTrue() + netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{1, 2, 3, 4})) + netassert.Port(destination.Port()).Equals(128) } func TestDokodemoUDP(t *testing.T) { @@ -86,10 +86,9 @@ func TestDokodemoUDP(t *testing.T) { udpClient.Write([]byte(data2Send)) udpClient.Close() - lastPacket := <-testPacketDispatcher.LastPacket + destination := <-testPacketDispatcher.Destination - assert.StringLiteral(data2Send).Equals(string(lastPacket.Chunk().Value)) - assert.Bool(lastPacket.Destination().IsUDP()).IsTrue() - netassert.Address(lastPacket.Destination().Address()).Equals(v2net.IPAddress([]byte{5, 6, 7, 8})) - netassert.Port(lastPacket.Destination().Port()).Equals(256) + assert.Bool(destination.IsUDP()).IsTrue() + netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{5, 6, 7, 8})) + netassert.Port(destination.Port()).Equals(256) } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index b6058f0aa..a3b1fd49e 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -19,6 +19,9 @@ type FreedomConnection struct { func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { log.Info("Freedom: Opening connection to ", firstPacket.Destination()) + defer firstPacket.Release() + defer ray.OutboundInput().Release() + var conn net.Conn err := retry.Timed(5, 100).On(func() error { rawConn, err := dialer.Dial(firstPacket.Destination()) @@ -29,7 +32,6 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou return nil }) if err != nil { - close(ray.OutboundOutput()) log.Error("Freedom: Failed to open connection to ", firstPacket.Destination(), ": ", err) return err } @@ -43,21 +45,20 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou if chunk := firstPacket.Chunk(); chunk != nil { conn.Write(chunk.Value) - chunk.Release() } if !firstPacket.MoreChunks() { writeMutex.Unlock() } else { go func() { - v2io.ChanToRawWriter(conn, input) + v2io.Pipe(input, v2io.NewAdaptiveWriter(conn)) writeMutex.Unlock() }() } go func() { defer readMutex.Unlock() - defer close(output) + defer output.Close() var reader io.Reader = conn @@ -65,7 +66,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou reader = v2net.NewTimeOutReader(16 /* seconds */, conn) } - v2io.RawReaderToChan(output, reader) + v2io.Pipe(v2io.NewAdaptiveReader(reader), output) }() writeMutex.Lock() diff --git a/proxy/freedom/freedom_test.go b/proxy/freedom/freedom_test.go index c1837e5d1..2ed95db88 100644 --- a/proxy/freedom/freedom_test.go +++ b/proxy/freedom/freedom_test.go @@ -37,15 +37,12 @@ func TestSinglePacket(t *testing.T) { err = freedom.Dispatch(packet, traffic) assert.Error(err).IsNil() - close(traffic.InboundInput()) + traffic.InboundInput().Close() - respPayload := <-traffic.InboundOutput() - defer respPayload.Release() + respPayload, err := traffic.InboundOutput().Read() + assert.Error(err).IsNil() assert.Bytes(respPayload.Value).Equals([]byte("Processed: Data to be sent to remote")) - _, open := <-traffic.InboundOutput() - assert.Bool(open).IsFalse() - tcpServer.Close() } @@ -60,7 +57,4 @@ func TestUnreachableDestination(t *testing.T) { err := freedom.Dispatch(packet, traffic) assert.Error(err).IsNotNil() - - _, open := <-traffic.InboundOutput() - assert.Bool(open).IsFalse() } diff --git a/proxy/http/chan_reader.go b/proxy/http/chan_reader.go index 977bd4e86..def0344da 100644 --- a/proxy/http/chan_reader.go +++ b/proxy/http/chan_reader.go @@ -4,15 +4,16 @@ import ( "io" "github.com/v2ray/v2ray-core/common/alloc" + v2io "github.com/v2ray/v2ray-core/common/io" ) type ChanReader struct { - stream <-chan *alloc.Buffer + stream v2io.Reader current *alloc.Buffer eof bool } -func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader { +func NewChanReader(stream v2io.Reader) *ChanReader { this := &ChanReader{ stream: stream, } @@ -21,9 +22,9 @@ func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader { } func (this *ChanReader) fill() { - b, open := <-this.stream + b, err := this.stream.Read() this.current = b - if !open { + if err != nil { this.eof = true this.current = nil } diff --git a/proxy/http/http.go b/proxy/http/http.go index dff1bbad0..dacf3996b 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -154,13 +154,14 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra defer wg.Wait() go func() { - v2io.RawReaderToChan(ray.InboundInput(), input) - close(ray.InboundInput()) + v2io.Pipe(v2io.NewAdaptiveReader(input), ray.InboundInput()) + ray.InboundInput().Close() wg.Done() }() go func() { - v2io.ChanToRawWriter(output, ray.InboundOutput()) + v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(output)) + ray.InboundOutput().Release() wg.Done() }() } @@ -222,7 +223,7 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D packet := v2net.NewPacket(dest, requestBuffer, true) ray := this.packetDispatcher.DispatchToOutbound(packet) - defer close(ray.InboundInput()) + defer ray.InboundInput().Close() var wg sync.WaitGroup wg.Add(1) diff --git a/proxy/shadowsocks/shadowsocks.go b/proxy/shadowsocks/shadowsocks.go index 9f721fb4c..671aed993 100644 --- a/proxy/shadowsocks/shadowsocks.go +++ b/proxy/shadowsocks/shadowsocks.go @@ -204,7 +204,7 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { var writeFinish sync.Mutex writeFinish.Lock() go func() { - if payload, ok := <-ray.InboundOutput(); ok { + if payload, err := ray.InboundOutput().Read(); err == nil { payload.SliceBack(ivLen) rand.Read(payload.Value[:ivLen]) @@ -219,7 +219,8 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { payload.Release() writer := crypto.NewCryptionWriter(stream, conn) - v2io.ChanToRawWriter(writer, ray.InboundOutput()) + v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(writer)) + ray.InboundOutput().Release() } writeFinish.Unlock() }() @@ -232,8 +233,8 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { payloadReader = v2io.NewAdaptiveReader(reader) } - v2io.ReaderToChan(ray.InboundInput(), payloadReader) - close(ray.InboundInput()) + v2io.Pipe(payloadReader, ray.InboundInput()) + ray.InboundInput().Close() payloadReader.Release() writeFinish.Lock() diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index 933cb1bc5..5b7125d3a 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -276,14 +276,15 @@ func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPack outputFinish.Lock() go func() { - v2io.RawReaderToChan(input, reader) + v2io.Pipe(v2io.NewAdaptiveReader(reader), input) inputFinish.Unlock() - close(input) + input.Close() }() go func() { - v2io.ChanToRawWriter(writer, output) + v2io.Pipe(output, v2io.NewAdaptiveWriter(writer)) outputFinish.Unlock() + output.Release() }() outputFinish.Lock() } diff --git a/proxy/testing/mocks/inboundhandler.go b/proxy/testing/mocks/inboundhandler.go index 745e10237..d350a4b0f 100644 --- a/proxy/testing/mocks/inboundhandler.go +++ b/proxy/testing/mocks/inboundhandler.go @@ -42,13 +42,14 @@ func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error { writeFinish.Lock() go func() { - v2io.RawReaderToChan(input, this.ConnInput) - close(input) + v2io.Pipe(v2io.NewAdaptiveReader(this.ConnInput), input) + input.Close() readFinish.Unlock() }() go func() { - v2io.ChanToRawWriter(this.ConnOutput, output) + v2io.Pipe(output, v2io.NewAdaptiveWriter(this.ConnOutput)) + output.Release() writeFinish.Unlock() }() diff --git a/proxy/testing/mocks/outboundhandler.go b/proxy/testing/mocks/outboundhandler.go index 99a34fe41..fc9517ffa 100644 --- a/proxy/testing/mocks/outboundhandler.go +++ b/proxy/testing/mocks/outboundhandler.go @@ -33,15 +33,16 @@ func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.Out writeFinish.Lock() go func() { - v2io.ChanToRawWriter(this.ConnOutput, input) + v2io.Pipe(input, v2io.NewAdaptiveWriter(this.ConnOutput)) writeFinish.Unlock() + input.Release() }() writeFinish.Lock() } - v2io.RawReaderToChan(output, this.ConnInput) - close(output) + v2io.Pipe(v2io.NewAdaptiveReader(this.ConnInput), output) + output.Close() return nil } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 3a8f7c38f..05ee9ac75 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -145,7 +145,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { connReader.SetTimeOut(userSettings.PayloadReadTimeout) reader.SetCached(false) go func() { - defer close(input) + defer input.Close() defer readFinish.Unlock() bodyReader := session.DecodeRequestBody(reader) var requestReader v2io.Reader @@ -154,7 +154,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { } else { requestReader = v2io.NewAdaptiveReader(bodyReader) } - v2io.ReaderToChan(input, requestReader) + v2io.Pipe(requestReader, input) requestReader.Release() }() @@ -170,7 +170,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { bodyWriter := session.EncodeResponseBody(writer) // Optimize for small response packet - if data, open := <-output; open { + if data, err := output.Read(); err == nil { if request.Option.IsChunkStream() { vmessio.Authenticate(data) } @@ -183,7 +183,8 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { if request.Option.IsChunkStream() { writer = vmessio.NewAuthChunkWriter(writer) } - v2io.ChanToWriter(writer, output) + v2io.Pipe(output, writer) + output.Release() writer.Release() finish.Unlock() }(&writeFinish) diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index c0e92b442..2914eb528 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/v2ray/v2ray-core/app" - "github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -60,7 +59,7 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader, if err != nil { log.Error("Failed to open ", dest, ": ", err) if ray != nil { - close(ray.OutboundOutput()) + ray.OutboundOutput().Close() } return err } @@ -83,10 +82,12 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader, requestFinish.Lock() conn.CloseWrite() responseFinish.Lock() + output.Close() + input.Release() return nil } -func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input <-chan *alloc.Buffer, finish *sync.Mutex) { +func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input v2io.Reader, finish *sync.Mutex) { defer finish.Unlock() writer := v2io.NewBufferedWriter(conn) @@ -97,15 +98,6 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn firstChunk := firstPacket.Chunk() moreChunks := firstPacket.MoreChunks() - for firstChunk == nil && moreChunks { - firstChunk, moreChunks = <-input - } - - if firstChunk == nil && !moreChunks { - log.Warning("VMessOut: Nothing to send. Existing...") - return - } - if request.Option.IsChunkStream() { vmessio.Authenticate(firstChunk) } @@ -121,15 +113,14 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn if request.Option.IsChunkStream() { streamWriter = vmessio.NewAuthChunkWriter(streamWriter) } - v2io.ChanToWriter(streamWriter, input) + v2io.Pipe(input, streamWriter) streamWriter.Release() } return } -func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, dest v2net.Destination, output chan<- *alloc.Buffer, finish *sync.Mutex) { +func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) { defer finish.Unlock() - defer close(output) reader := v2io.NewBufferedReader(conn) defer reader.Release() @@ -151,7 +142,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con bodyReader = v2io.NewAdaptiveReader(decryptReader) } - v2io.ReaderToChan(output, bodyReader) + v2io.Pipe(bodyReader, output) bodyReader.Release() return diff --git a/shell/point/point.go b/shell/point/point.go index 1b423958f..fcff87d45 100644 --- a/shell/point/point.go +++ b/shell/point/point.go @@ -199,13 +199,17 @@ func (this *Point) FilterPacketAndDispatch(packet v2net.Packet, link ray.Outboun chunk := packet.Chunk() moreChunks := packet.MoreChunks() changed := false + var err error for chunk == nil && moreChunks { changed = true - chunk, moreChunks = <-link.OutboundInput() + chunk, err = link.OutboundInput().Read() + if err != nil { + moreChunks = false + } } if chunk == nil && !moreChunks { log.Info("Point: No payload to dispatch, stopping dispatching now.") - close(link.OutboundOutput()) + link.OutboundOutput().Close() return } diff --git a/tools/build/go_test.go b/tools/build/go_test.go index 11c9cadc0..bd6c4f426 100644 --- a/tools/build/go_test.go +++ b/tools/build/go_test.go @@ -20,10 +20,10 @@ func TestBuildAndRun(t *testing.T) { gopath := os.Getenv("GOPATH") goOS := parseOS(runtime.GOOS) goArch := parseArch(runtime.GOARCH) - target := filepath.Join(gopath, "src", "v2ray_test") - if goOS == Windows { - target += ".exe" - } + target := filepath.Join(gopath, "src", "v2ray_test") + if goOS == Windows { + target += ".exe" + } err := buildV2Ray(target, "v1.0", goOS, goArch) assert.Error(err).IsNil() diff --git a/transport/hub/udp_server.go b/transport/hub/udp_server.go index 9528e4fd9..7e61262ed 100644 --- a/transport/hub/udp_server.go +++ b/transport/hub/udp_server.go @@ -32,7 +32,7 @@ func (this *UDPServer) locateExistingAndDispatch(dest string, packet v2net.Packe this.RLock() defer this.RUnlock() if entry, found := this.conns[dest]; found { - entry.inboundRay.InboundInput() <- packet.Chunk() + entry.inboundRay.InboundInput().Write(packet.Chunk()) return true } return false @@ -55,8 +55,12 @@ func (this *UDPServer) Dispatch(source v2net.Destination, packet v2net.Packet, c } func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) { - for buffer := range inboundRay.InboundOutput() { - callback(v2net.NewPacket(source, buffer, false)) + for { + data, err := inboundRay.InboundOutput().Read() + if err != nil { + break + } + callback(v2net.NewPacket(source, data, false)) } this.Lock() delete(this.conns, destString) diff --git a/transport/ray/direct.go b/transport/ray/direct.go index 25495143f..38b538439 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -1,6 +1,9 @@ package ray import ( + "io" + "sync" + "github.com/v2ray/v2ray-core/common/alloc" ) @@ -11,28 +14,101 @@ const ( // NewRay creates a new Ray for direct traffic transport. func NewRay() Ray { return &directRay{ - Input: make(chan *alloc.Buffer, bufferSize), - Output: make(chan *alloc.Buffer, bufferSize), + Input: NewStream(), + Output: NewStream(), } } type directRay struct { - Input chan *alloc.Buffer - Output chan *alloc.Buffer + Input *Stream + Output *Stream } -func (this *directRay) OutboundInput() <-chan *alloc.Buffer { +func (this *directRay) OutboundInput() InputStream { return this.Input } -func (this *directRay) OutboundOutput() chan<- *alloc.Buffer { +func (this *directRay) OutboundOutput() OutputStream { return this.Output } -func (this *directRay) InboundInput() chan<- *alloc.Buffer { +func (this *directRay) InboundInput() OutputStream { return this.Input } -func (this *directRay) InboundOutput() <-chan *alloc.Buffer { +func (this *directRay) InboundOutput() InputStream { return this.Output } + +type Stream struct { + access sync.RWMutex + closed bool + buffer chan *alloc.Buffer +} + +func NewStream() *Stream { + return &Stream{ + buffer: make(chan *alloc.Buffer, bufferSize), + } +} + +func (this *Stream) Read() (*alloc.Buffer, error) { + if this.buffer == nil { + return nil, io.EOF + } + this.access.RLock() + defer this.access.RUnlock() + if this.buffer == nil { + return nil, io.EOF + } + result, open := <-this.buffer + if !open { + return nil, io.EOF + } + return result, nil +} + +func (this *Stream) Write(data *alloc.Buffer) error { + if this.closed { + return io.EOF + } + if this.buffer == nil { + return io.EOF + } + this.access.RLock() + defer this.access.RUnlock() + if this.buffer == nil { + return io.EOF + } + this.buffer <- data + return nil +} + +func (this *Stream) Close() { + if this.closed { + return + } + this.access.RLock() + defer this.access.RUnlock() + if this.closed { + return + } + this.closed = true + close(this.buffer) +} + +func (this *Stream) Release() { + if this.buffer == nil { + return + } + this.Close() + this.access.Lock() + defer this.access.Unlock() + if this.buffer == nil { + return + } + for data := range this.buffer { + data.Release() + } + this.buffer = nil +} diff --git a/transport/ray/ray.go b/transport/ray/ray.go index 0dcc0c983..91812a973 100644 --- a/transport/ray/ray.go +++ b/transport/ray/ray.go @@ -1,19 +1,19 @@ package ray import ( - "github.com/v2ray/v2ray-core/common/alloc" + v2io "github.com/v2ray/v2ray-core/common/io" ) // OutboundRay is a transport interface for outbound connections. type OutboundRay interface { // OutboundInput provides a stream for the input of the outbound connection. // The outbound connection shall write all the input until it is closed. - OutboundInput() <-chan *alloc.Buffer + OutboundInput() InputStream // OutboundOutput provides a stream to retrieve the response from the // outbound connection. The outbound connection shall close the channel // after all responses are receivced and put into the channel. - OutboundOutput() chan<- *alloc.Buffer + OutboundOutput() OutputStream } // InboundRay is a transport interface for inbound connections. @@ -21,12 +21,12 @@ type InboundRay interface { // InboundInput provides a stream to retrieve the request from client. // The inbound connection shall close the channel after the entire request // is received and put into the channel. - InboundInput() chan<- *alloc.Buffer + InboundInput() OutputStream // InboudBound provides a stream of data for the inbound connection to write // as response. The inbound connection shall write all the data from the // channel until it is closed. - InboundOutput() <-chan *alloc.Buffer + InboundOutput() InputStream } // Ray is an internal tranport channel between inbound and outbound connection. @@ -34,3 +34,13 @@ type Ray interface { InboundRay OutboundRay } + +type InputStream interface { + v2io.Reader + Close() +} + +type OutputStream interface { + v2io.Writer + Close() +}