1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-22 01:57:12 -05:00

use stream instead of raw chan

This commit is contained in:
v2ray 2016-04-18 18:44:10 +02:00
parent 42b8dbe871
commit 7407c8d561
25 changed files with 217 additions and 188 deletions

View File

@ -6,28 +6,32 @@ import (
) )
type TestPacketDispatcher struct { type TestPacketDispatcher struct {
LastPacket chan v2net.Packet Destination chan v2net.Destination
Handler func(packet v2net.Packet, traffic ray.OutboundRay) Handler func(packet v2net.Packet, traffic ray.OutboundRay)
} }
func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.OutboundRay)) *TestPacketDispatcher { func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.OutboundRay)) *TestPacketDispatcher {
if handler == nil { if handler == nil {
handler = func(packet v2net.Packet, traffic ray.OutboundRay) { handler = func(packet v2net.Packet, traffic ray.OutboundRay) {
for payload := range traffic.OutboundInput() { for {
traffic.OutboundOutput() <- payload.Prepend([]byte("Processed: ")) payload, err := traffic.OutboundInput().Read()
if err != nil {
break
} }
close(traffic.OutboundOutput()) traffic.OutboundOutput().Write(payload.Prepend([]byte("Processed: ")))
}
traffic.OutboundOutput().Close()
} }
} }
return &TestPacketDispatcher{ return &TestPacketDispatcher{
LastPacket: make(chan v2net.Packet, 16), Destination: make(chan v2net.Destination),
Handler: handler, Handler: handler,
} }
} }
func (this *TestPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay { func (this *TestPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay {
traffic := ray.NewRay() traffic := ray.NewRay()
this.LastPacket <- packet this.Destination <- packet.Destination()
go this.Handler(packet, traffic) go this.Handler(packet, traffic)
return traffic return traffic

View File

@ -1,43 +1,16 @@
package io package io
import ( func Pipe(reader Reader, writer Writer) error {
"io"
"github.com/v2ray/v2ray-core/common/alloc"
)
func RawReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error {
return ReaderToChan(stream, NewAdaptiveReader(reader))
}
// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
func ReaderToChan(stream chan<- *alloc.Buffer, reader Reader) error {
for { for {
buffer, err := reader.Read() buffer, err := reader.Read()
if buffer.Len() > 0 { if buffer.Len() > 0 {
stream <- buffer err = writer.Write(buffer)
} else { } else {
buffer.Release() buffer.Release()
} }
if err != nil { if err != nil {
return err
}
}
}
func ChanToRawWriter(writer io.Writer, stream <-chan *alloc.Buffer) error {
return ChanToWriter(NewAdaptiveWriter(writer), stream)
}
// ChanToWriter dumps all content from a given chan to a writer until the chan is closed.
func ChanToWriter(writer Writer, stream <-chan *alloc.Buffer) error {
for buffer := range stream {
err := writer.Write(buffer)
buffer.Release()
if err != nil {
return err
}
}
return nil return nil
} }
}
}

View File

@ -1,37 +0,0 @@
package io_test
import (
"bytes"
"crypto/rand"
"io"
"testing"
"github.com/v2ray/v2ray-core/common/alloc"
. "github.com/v2ray/v2ray-core/common/io"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestReaderAndWrite(t *testing.T) {
v2testing.Current(t)
size := 1024 * 1024
buffer := make([]byte, size)
nBytes, err := rand.Read(buffer)
assert.Int(nBytes).Equals(len(buffer))
assert.Error(err).IsNil()
readerBuffer := bytes.NewReader(buffer)
writerBuffer := bytes.NewBuffer(make([]byte, 0, size))
transportChan := make(chan *alloc.Buffer, 1024)
err = ReaderToChan(transportChan, NewAdaptiveReader(readerBuffer))
assert.Error(err).Equals(io.EOF)
close(transportChan)
err = ChanToRawWriter(writerBuffer, transportChan)
assert.Error(err).IsNil()
assert.Bytes(buffer).Equals(writerBuffer.Bytes())
}

View File

@ -32,6 +32,7 @@ func (this *AdaptiveWriter) Write(buffer *alloc.Buffer) error {
if nBytes < buffer.Len() { if nBytes < buffer.Len() {
_, err = this.writer.Write(buffer.Value[nBytes:]) _, err = this.writer.Write(buffer.Value[nBytes:])
} }
buffer.Release()
return err return err
} }

View File

@ -41,8 +41,8 @@ var (
noOpLoggerInstance logWriter = &noOpLogWriter{} noOpLoggerInstance logWriter = &noOpLogWriter{}
streamLoggerInstance logWriter = newStdOutLogWriter() streamLoggerInstance logWriter = newStdOutLogWriter()
debugLogger = noOpLoggerInstance debugLogger = streamLoggerInstance
infoLogger = noOpLoggerInstance infoLogger = streamLoggerInstance
warningLogger = streamLoggerInstance warningLogger = streamLoggerInstance
errorLogger = streamLoggerInstance errorLogger = streamLoggerInstance
) )

View File

@ -5,8 +5,8 @@ import (
"log" "log"
"testing" "testing"
"github.com/v2ray/v2ray-core/common/serial"
"github.com/v2ray/v2ray-core/common/platform" "github.com/v2ray/v2ray-core/common/platform"
"github.com/v2ray/v2ray-core/common/serial"
v2testing "github.com/v2ray/v2ray-core/testing" v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert" "github.com/v2ray/v2ray-core/testing/assert"
) )

View File

@ -1,11 +1,14 @@
package net package net
import ( import (
"github.com/v2ray/v2ray-core/common"
"github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/alloc"
) )
// Packet is a network packet to be sent to destination. // Packet is a network packet to be sent to destination.
type Packet interface { type Packet interface {
common.Releasable
Destination() Destination Destination() Destination
Chunk() *alloc.Buffer // First chunk of this commnunication Chunk() *alloc.Buffer // First chunk of this commnunication
MoreChunks() bool MoreChunks() bool
@ -37,3 +40,8 @@ func (packet *packetImpl) Chunk() *alloc.Buffer {
func (packet *packetImpl) MoreChunks() bool { func (packet *packetImpl) MoreChunks() bool {
return packet.moreData return packet.moreData
} }
func (packet *packetImpl) Release() {
packet.data.Release()
packet.data = nil
}

View File

@ -1,10 +1,7 @@
package blackhole package blackhole
import ( import (
"io/ioutil"
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
v2io "github.com/v2ray/v2ray-core/common/io"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/proxy/internal" "github.com/v2ray/v2ray-core/proxy/internal"
@ -20,14 +17,14 @@ func NewBlackHole() *BlackHole {
} }
func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error {
if chunk := firstPacket.Chunk(); chunk != nil { firstPacket.Release()
chunk.Release()
} ray.OutboundOutput().Close()
ray.OutboundOutput().Release()
ray.OutboundInput().Close()
ray.OutboundInput().Release()
close(ray.OutboundOutput())
if firstPacket.MoreChunks() {
v2io.ChanToRawWriter(ioutil.Discard, ray.OutboundInput())
}
return nil return nil
} }

View File

@ -1,7 +1,6 @@
package dokodemo package dokodemo
import ( import (
"io"
"sync" "sync"
"github.com/v2ray/v2ray-core/app/dispatcher" "github.com/v2ray/v2ray-core/app/dispatcher"
@ -126,25 +125,23 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) {
packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true) packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true)
ray := this.packetDispatcher.DispatchToOutbound(packet) ray := this.packetDispatcher.DispatchToOutbound(packet)
defer ray.InboundOutput().Release()
var inputFinish, outputFinish sync.Mutex var inputFinish, outputFinish sync.Mutex
inputFinish.Lock() inputFinish.Lock()
outputFinish.Lock() outputFinish.Lock()
reader := v2net.NewTimeOutReader(this.config.Timeout, conn) reader := v2net.NewTimeOutReader(this.config.Timeout, conn)
go dumpInput(reader, ray.InboundInput(), &inputFinish) go func() {
go dumpOutput(conn, ray.InboundOutput(), &outputFinish) v2io.Pipe(v2io.NewAdaptiveReader(reader), ray.InboundInput())
inputFinish.Unlock()
ray.InboundInput().Close()
}()
go func() {
v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(conn))
outputFinish.Unlock()
}()
outputFinish.Lock() outputFinish.Lock()
} }
func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {
v2io.RawReaderToChan(input, reader)
finish.Unlock()
close(input)
}
func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) {
v2io.ChanToRawWriter(writer, output)
finish.Unlock()
}

View File

@ -43,7 +43,7 @@ func TestDokodemoTCP(t *testing.T) {
tcpClient.Write([]byte(data2Send)) tcpClient.Write([]byte(data2Send))
tcpClient.CloseWrite() tcpClient.CloseWrite()
lastPacket := <-testPacketDispatcher.LastPacket destination := <-testPacketDispatcher.Destination
response := make([]byte, 1024) response := make([]byte, 1024)
nBytes, err := tcpClient.Read(response) nBytes, err := tcpClient.Read(response)
@ -51,9 +51,9 @@ func TestDokodemoTCP(t *testing.T) {
tcpClient.Close() tcpClient.Close()
assert.StringLiteral("Processed: " + data2Send).Equals(string(response[:nBytes])) assert.StringLiteral("Processed: " + data2Send).Equals(string(response[:nBytes]))
assert.Bool(lastPacket.Destination().IsTCP()).IsTrue() assert.Bool(destination.IsTCP()).IsTrue()
netassert.Address(lastPacket.Destination().Address()).Equals(v2net.IPAddress([]byte{1, 2, 3, 4})) netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{1, 2, 3, 4}))
netassert.Port(lastPacket.Destination().Port()).Equals(128) netassert.Port(destination.Port()).Equals(128)
} }
func TestDokodemoUDP(t *testing.T) { func TestDokodemoUDP(t *testing.T) {
@ -86,10 +86,9 @@ func TestDokodemoUDP(t *testing.T) {
udpClient.Write([]byte(data2Send)) udpClient.Write([]byte(data2Send))
udpClient.Close() udpClient.Close()
lastPacket := <-testPacketDispatcher.LastPacket destination := <-testPacketDispatcher.Destination
assert.StringLiteral(data2Send).Equals(string(lastPacket.Chunk().Value)) assert.Bool(destination.IsUDP()).IsTrue()
assert.Bool(lastPacket.Destination().IsUDP()).IsTrue() netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{5, 6, 7, 8}))
netassert.Address(lastPacket.Destination().Address()).Equals(v2net.IPAddress([]byte{5, 6, 7, 8})) netassert.Port(destination.Port()).Equals(256)
netassert.Port(lastPacket.Destination().Port()).Equals(256)
} }

View File

@ -19,6 +19,9 @@ type FreedomConnection struct {
func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error { func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error {
log.Info("Freedom: Opening connection to ", firstPacket.Destination()) log.Info("Freedom: Opening connection to ", firstPacket.Destination())
defer firstPacket.Release()
defer ray.OutboundInput().Release()
var conn net.Conn var conn net.Conn
err := retry.Timed(5, 100).On(func() error { err := retry.Timed(5, 100).On(func() error {
rawConn, err := dialer.Dial(firstPacket.Destination()) rawConn, err := dialer.Dial(firstPacket.Destination())
@ -29,7 +32,6 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
return nil return nil
}) })
if err != nil { if err != nil {
close(ray.OutboundOutput())
log.Error("Freedom: Failed to open connection to ", firstPacket.Destination(), ": ", err) log.Error("Freedom: Failed to open connection to ", firstPacket.Destination(), ": ", err)
return err return err
} }
@ -43,21 +45,20 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
if chunk := firstPacket.Chunk(); chunk != nil { if chunk := firstPacket.Chunk(); chunk != nil {
conn.Write(chunk.Value) conn.Write(chunk.Value)
chunk.Release()
} }
if !firstPacket.MoreChunks() { if !firstPacket.MoreChunks() {
writeMutex.Unlock() writeMutex.Unlock()
} else { } else {
go func() { go func() {
v2io.ChanToRawWriter(conn, input) v2io.Pipe(input, v2io.NewAdaptiveWriter(conn))
writeMutex.Unlock() writeMutex.Unlock()
}() }()
} }
go func() { go func() {
defer readMutex.Unlock() defer readMutex.Unlock()
defer close(output) defer output.Close()
var reader io.Reader = conn var reader io.Reader = conn
@ -65,7 +66,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
reader = v2net.NewTimeOutReader(16 /* seconds */, conn) reader = v2net.NewTimeOutReader(16 /* seconds */, conn)
} }
v2io.RawReaderToChan(output, reader) v2io.Pipe(v2io.NewAdaptiveReader(reader), output)
}() }()
writeMutex.Lock() writeMutex.Lock()

View File

@ -37,15 +37,12 @@ func TestSinglePacket(t *testing.T) {
err = freedom.Dispatch(packet, traffic) err = freedom.Dispatch(packet, traffic)
assert.Error(err).IsNil() assert.Error(err).IsNil()
close(traffic.InboundInput()) traffic.InboundInput().Close()
respPayload := <-traffic.InboundOutput() respPayload, err := traffic.InboundOutput().Read()
defer respPayload.Release() assert.Error(err).IsNil()
assert.Bytes(respPayload.Value).Equals([]byte("Processed: Data to be sent to remote")) assert.Bytes(respPayload.Value).Equals([]byte("Processed: Data to be sent to remote"))
_, open := <-traffic.InboundOutput()
assert.Bool(open).IsFalse()
tcpServer.Close() tcpServer.Close()
} }
@ -60,7 +57,4 @@ func TestUnreachableDestination(t *testing.T) {
err := freedom.Dispatch(packet, traffic) err := freedom.Dispatch(packet, traffic)
assert.Error(err).IsNotNil() assert.Error(err).IsNotNil()
_, open := <-traffic.InboundOutput()
assert.Bool(open).IsFalse()
} }

View File

@ -4,15 +4,16 @@ import (
"io" "io"
"github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io"
) )
type ChanReader struct { type ChanReader struct {
stream <-chan *alloc.Buffer stream v2io.Reader
current *alloc.Buffer current *alloc.Buffer
eof bool eof bool
} }
func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader { func NewChanReader(stream v2io.Reader) *ChanReader {
this := &ChanReader{ this := &ChanReader{
stream: stream, stream: stream,
} }
@ -21,9 +22,9 @@ func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader {
} }
func (this *ChanReader) fill() { func (this *ChanReader) fill() {
b, open := <-this.stream b, err := this.stream.Read()
this.current = b this.current = b
if !open { if err != nil {
this.eof = true this.eof = true
this.current = nil this.current = nil
} }

View File

@ -154,13 +154,14 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra
defer wg.Wait() defer wg.Wait()
go func() { go func() {
v2io.RawReaderToChan(ray.InboundInput(), input) v2io.Pipe(v2io.NewAdaptiveReader(input), ray.InboundInput())
close(ray.InboundInput()) ray.InboundInput().Close()
wg.Done() wg.Done()
}() }()
go func() { go func() {
v2io.ChanToRawWriter(output, ray.InboundOutput()) v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(output))
ray.InboundOutput().Release()
wg.Done() wg.Done()
}() }()
} }
@ -222,7 +223,7 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D
packet := v2net.NewPacket(dest, requestBuffer, true) packet := v2net.NewPacket(dest, requestBuffer, true)
ray := this.packetDispatcher.DispatchToOutbound(packet) ray := this.packetDispatcher.DispatchToOutbound(packet)
defer close(ray.InboundInput()) defer ray.InboundInput().Close()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)

View File

@ -204,7 +204,7 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
var writeFinish sync.Mutex var writeFinish sync.Mutex
writeFinish.Lock() writeFinish.Lock()
go func() { go func() {
if payload, ok := <-ray.InboundOutput(); ok { if payload, err := ray.InboundOutput().Read(); err == nil {
payload.SliceBack(ivLen) payload.SliceBack(ivLen)
rand.Read(payload.Value[:ivLen]) rand.Read(payload.Value[:ivLen])
@ -219,7 +219,8 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
payload.Release() payload.Release()
writer := crypto.NewCryptionWriter(stream, conn) writer := crypto.NewCryptionWriter(stream, conn)
v2io.ChanToRawWriter(writer, ray.InboundOutput()) v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(writer))
ray.InboundOutput().Release()
} }
writeFinish.Unlock() writeFinish.Unlock()
}() }()
@ -232,8 +233,8 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
payloadReader = v2io.NewAdaptiveReader(reader) payloadReader = v2io.NewAdaptiveReader(reader)
} }
v2io.ReaderToChan(ray.InboundInput(), payloadReader) v2io.Pipe(payloadReader, ray.InboundInput())
close(ray.InboundInput()) ray.InboundInput().Close()
payloadReader.Release() payloadReader.Release()
writeFinish.Lock() writeFinish.Lock()

View File

@ -276,14 +276,15 @@ func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPack
outputFinish.Lock() outputFinish.Lock()
go func() { go func() {
v2io.RawReaderToChan(input, reader) v2io.Pipe(v2io.NewAdaptiveReader(reader), input)
inputFinish.Unlock() inputFinish.Unlock()
close(input) input.Close()
}() }()
go func() { go func() {
v2io.ChanToRawWriter(writer, output) v2io.Pipe(output, v2io.NewAdaptiveWriter(writer))
outputFinish.Unlock() outputFinish.Unlock()
output.Release()
}() }()
outputFinish.Lock() outputFinish.Lock()
} }

View File

@ -42,13 +42,14 @@ func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error {
writeFinish.Lock() writeFinish.Lock()
go func() { go func() {
v2io.RawReaderToChan(input, this.ConnInput) v2io.Pipe(v2io.NewAdaptiveReader(this.ConnInput), input)
close(input) input.Close()
readFinish.Unlock() readFinish.Unlock()
}() }()
go func() { go func() {
v2io.ChanToRawWriter(this.ConnOutput, output) v2io.Pipe(output, v2io.NewAdaptiveWriter(this.ConnOutput))
output.Release()
writeFinish.Unlock() writeFinish.Unlock()
}() }()

View File

@ -33,15 +33,16 @@ func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.Out
writeFinish.Lock() writeFinish.Lock()
go func() { go func() {
v2io.ChanToRawWriter(this.ConnOutput, input) v2io.Pipe(input, v2io.NewAdaptiveWriter(this.ConnOutput))
writeFinish.Unlock() writeFinish.Unlock()
input.Release()
}() }()
writeFinish.Lock() writeFinish.Lock()
} }
v2io.RawReaderToChan(output, this.ConnInput) v2io.Pipe(v2io.NewAdaptiveReader(this.ConnInput), output)
close(output) output.Close()
return nil return nil
} }

View File

@ -145,7 +145,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
connReader.SetTimeOut(userSettings.PayloadReadTimeout) connReader.SetTimeOut(userSettings.PayloadReadTimeout)
reader.SetCached(false) reader.SetCached(false)
go func() { go func() {
defer close(input) defer input.Close()
defer readFinish.Unlock() defer readFinish.Unlock()
bodyReader := session.DecodeRequestBody(reader) bodyReader := session.DecodeRequestBody(reader)
var requestReader v2io.Reader var requestReader v2io.Reader
@ -154,7 +154,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
} else { } else {
requestReader = v2io.NewAdaptiveReader(bodyReader) requestReader = v2io.NewAdaptiveReader(bodyReader)
} }
v2io.ReaderToChan(input, requestReader) v2io.Pipe(requestReader, input)
requestReader.Release() requestReader.Release()
}() }()
@ -170,7 +170,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
bodyWriter := session.EncodeResponseBody(writer) bodyWriter := session.EncodeResponseBody(writer)
// Optimize for small response packet // Optimize for small response packet
if data, open := <-output; open { if data, err := output.Read(); err == nil {
if request.Option.IsChunkStream() { if request.Option.IsChunkStream() {
vmessio.Authenticate(data) vmessio.Authenticate(data)
} }
@ -183,7 +183,8 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
if request.Option.IsChunkStream() { if request.Option.IsChunkStream() {
writer = vmessio.NewAuthChunkWriter(writer) writer = vmessio.NewAuthChunkWriter(writer)
} }
v2io.ChanToWriter(writer, output) v2io.Pipe(output, writer)
output.Release()
writer.Release() writer.Release()
finish.Unlock() finish.Unlock()
}(&writeFinish) }(&writeFinish)

View File

@ -5,7 +5,6 @@ import (
"sync" "sync"
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
@ -60,7 +59,7 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader,
if err != nil { if err != nil {
log.Error("Failed to open ", dest, ": ", err) log.Error("Failed to open ", dest, ": ", err)
if ray != nil { if ray != nil {
close(ray.OutboundOutput()) ray.OutboundOutput().Close()
} }
return err return err
} }
@ -83,10 +82,12 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader,
requestFinish.Lock() requestFinish.Lock()
conn.CloseWrite() conn.CloseWrite()
responseFinish.Lock() responseFinish.Lock()
output.Close()
input.Release()
return nil return nil
} }
func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input <-chan *alloc.Buffer, finish *sync.Mutex) { func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input v2io.Reader, finish *sync.Mutex) {
defer finish.Unlock() defer finish.Unlock()
writer := v2io.NewBufferedWriter(conn) writer := v2io.NewBufferedWriter(conn)
@ -97,15 +98,6 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
firstChunk := firstPacket.Chunk() firstChunk := firstPacket.Chunk()
moreChunks := firstPacket.MoreChunks() moreChunks := firstPacket.MoreChunks()
for firstChunk == nil && moreChunks {
firstChunk, moreChunks = <-input
}
if firstChunk == nil && !moreChunks {
log.Warning("VMessOut: Nothing to send. Existing...")
return
}
if request.Option.IsChunkStream() { if request.Option.IsChunkStream() {
vmessio.Authenticate(firstChunk) vmessio.Authenticate(firstChunk)
} }
@ -121,15 +113,14 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
if request.Option.IsChunkStream() { if request.Option.IsChunkStream() {
streamWriter = vmessio.NewAuthChunkWriter(streamWriter) streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
} }
v2io.ChanToWriter(streamWriter, input) v2io.Pipe(input, streamWriter)
streamWriter.Release() streamWriter.Release()
} }
return return
} }
func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, dest v2net.Destination, output chan<- *alloc.Buffer, finish *sync.Mutex) { func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
defer finish.Unlock() defer finish.Unlock()
defer close(output)
reader := v2io.NewBufferedReader(conn) reader := v2io.NewBufferedReader(conn)
defer reader.Release() defer reader.Release()
@ -151,7 +142,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
bodyReader = v2io.NewAdaptiveReader(decryptReader) bodyReader = v2io.NewAdaptiveReader(decryptReader)
} }
v2io.ReaderToChan(output, bodyReader) v2io.Pipe(bodyReader, output)
bodyReader.Release() bodyReader.Release()
return return

View File

@ -199,13 +199,17 @@ func (this *Point) FilterPacketAndDispatch(packet v2net.Packet, link ray.Outboun
chunk := packet.Chunk() chunk := packet.Chunk()
moreChunks := packet.MoreChunks() moreChunks := packet.MoreChunks()
changed := false changed := false
var err error
for chunk == nil && moreChunks { for chunk == nil && moreChunks {
changed = true changed = true
chunk, moreChunks = <-link.OutboundInput() chunk, err = link.OutboundInput().Read()
if err != nil {
moreChunks = false
}
} }
if chunk == nil && !moreChunks { if chunk == nil && !moreChunks {
log.Info("Point: No payload to dispatch, stopping dispatching now.") log.Info("Point: No payload to dispatch, stopping dispatching now.")
close(link.OutboundOutput()) link.OutboundOutput().Close()
return return
} }

View File

@ -32,7 +32,7 @@ func (this *UDPServer) locateExistingAndDispatch(dest string, packet v2net.Packe
this.RLock() this.RLock()
defer this.RUnlock() defer this.RUnlock()
if entry, found := this.conns[dest]; found { if entry, found := this.conns[dest]; found {
entry.inboundRay.InboundInput() <- packet.Chunk() entry.inboundRay.InboundInput().Write(packet.Chunk())
return true return true
} }
return false return false
@ -55,8 +55,12 @@ func (this *UDPServer) Dispatch(source v2net.Destination, packet v2net.Packet, c
} }
func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) { func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) {
for buffer := range inboundRay.InboundOutput() { for {
callback(v2net.NewPacket(source, buffer, false)) data, err := inboundRay.InboundOutput().Read()
if err != nil {
break
}
callback(v2net.NewPacket(source, data, false))
} }
this.Lock() this.Lock()
delete(this.conns, destString) delete(this.conns, destString)

View File

@ -1,6 +1,9 @@
package ray package ray
import ( import (
"io"
"sync"
"github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/alloc"
) )
@ -11,28 +14,101 @@ const (
// NewRay creates a new Ray for direct traffic transport. // NewRay creates a new Ray for direct traffic transport.
func NewRay() Ray { func NewRay() Ray {
return &directRay{ return &directRay{
Input: make(chan *alloc.Buffer, bufferSize), Input: NewStream(),
Output: make(chan *alloc.Buffer, bufferSize), Output: NewStream(),
} }
} }
type directRay struct { type directRay struct {
Input chan *alloc.Buffer Input *Stream
Output chan *alloc.Buffer Output *Stream
} }
func (this *directRay) OutboundInput() <-chan *alloc.Buffer { func (this *directRay) OutboundInput() InputStream {
return this.Input return this.Input
} }
func (this *directRay) OutboundOutput() chan<- *alloc.Buffer { func (this *directRay) OutboundOutput() OutputStream {
return this.Output return this.Output
} }
func (this *directRay) InboundInput() chan<- *alloc.Buffer { func (this *directRay) InboundInput() OutputStream {
return this.Input return this.Input
} }
func (this *directRay) InboundOutput() <-chan *alloc.Buffer { func (this *directRay) InboundOutput() InputStream {
return this.Output return this.Output
} }
type Stream struct {
access sync.RWMutex
closed bool
buffer chan *alloc.Buffer
}
func NewStream() *Stream {
return &Stream{
buffer: make(chan *alloc.Buffer, bufferSize),
}
}
func (this *Stream) Read() (*alloc.Buffer, error) {
if this.buffer == nil {
return nil, io.EOF
}
this.access.RLock()
defer this.access.RUnlock()
if this.buffer == nil {
return nil, io.EOF
}
result, open := <-this.buffer
if !open {
return nil, io.EOF
}
return result, nil
}
func (this *Stream) Write(data *alloc.Buffer) error {
if this.closed {
return io.EOF
}
if this.buffer == nil {
return io.EOF
}
this.access.RLock()
defer this.access.RUnlock()
if this.buffer == nil {
return io.EOF
}
this.buffer <- data
return nil
}
func (this *Stream) Close() {
if this.closed {
return
}
this.access.RLock()
defer this.access.RUnlock()
if this.closed {
return
}
this.closed = true
close(this.buffer)
}
func (this *Stream) Release() {
if this.buffer == nil {
return
}
this.Close()
this.access.Lock()
defer this.access.Unlock()
if this.buffer == nil {
return
}
for data := range this.buffer {
data.Release()
}
this.buffer = nil
}

View File

@ -1,19 +1,19 @@
package ray package ray
import ( import (
"github.com/v2ray/v2ray-core/common/alloc" v2io "github.com/v2ray/v2ray-core/common/io"
) )
// OutboundRay is a transport interface for outbound connections. // OutboundRay is a transport interface for outbound connections.
type OutboundRay interface { type OutboundRay interface {
// OutboundInput provides a stream for the input of the outbound connection. // OutboundInput provides a stream for the input of the outbound connection.
// The outbound connection shall write all the input until it is closed. // The outbound connection shall write all the input until it is closed.
OutboundInput() <-chan *alloc.Buffer OutboundInput() InputStream
// OutboundOutput provides a stream to retrieve the response from the // OutboundOutput provides a stream to retrieve the response from the
// outbound connection. The outbound connection shall close the channel // outbound connection. The outbound connection shall close the channel
// after all responses are receivced and put into the channel. // after all responses are receivced and put into the channel.
OutboundOutput() chan<- *alloc.Buffer OutboundOutput() OutputStream
} }
// InboundRay is a transport interface for inbound connections. // InboundRay is a transport interface for inbound connections.
@ -21,12 +21,12 @@ type InboundRay interface {
// InboundInput provides a stream to retrieve the request from client. // InboundInput provides a stream to retrieve the request from client.
// The inbound connection shall close the channel after the entire request // The inbound connection shall close the channel after the entire request
// is received and put into the channel. // is received and put into the channel.
InboundInput() chan<- *alloc.Buffer InboundInput() OutputStream
// InboudBound provides a stream of data for the inbound connection to write // InboudBound provides a stream of data for the inbound connection to write
// as response. The inbound connection shall write all the data from the // as response. The inbound connection shall write all the data from the
// channel until it is closed. // channel until it is closed.
InboundOutput() <-chan *alloc.Buffer InboundOutput() InputStream
} }
// Ray is an internal tranport channel between inbound and outbound connection. // Ray is an internal tranport channel between inbound and outbound connection.
@ -34,3 +34,13 @@ type Ray interface {
InboundRay InboundRay
OutboundRay OutboundRay
} }
type InputStream interface {
v2io.Reader
Close()
}
type OutputStream interface {
v2io.Writer
Close()
}