package ray import ( "io" "sync" "time" "v2ray.com/core/common/alloc" ) const ( bufferSize = 128 ) // NewRay creates a new Ray for direct traffic transport. func NewRay() Ray { return &directRay{ Input: NewStream(), Output: NewStream(), } } type directRay struct { Input *Stream Output *Stream } func (this *directRay) OutboundInput() InputStream { return this.Input } func (this *directRay) OutboundOutput() OutputStream { return this.Output } func (this *directRay) InboundInput() OutputStream { return this.Input } func (this *directRay) InboundOutput() InputStream { return this.Output } type Stream struct { access sync.RWMutex closed bool buffer chan *alloc.Buffer } func NewStream() *Stream { return &Stream{ buffer: make(chan *alloc.Buffer, bufferSize), } } func (this *Stream) Read() (*alloc.Buffer, error) { if this.buffer == nil { return nil, io.EOF } this.access.RLock() if this.buffer == nil { this.access.RUnlock() return nil, io.EOF } channel := this.buffer this.access.RUnlock() result, open := <-channel if !open { return nil, io.EOF } return result, nil } func (this *Stream) Write(data *alloc.Buffer) error { for !this.closed { err := this.TryWriteOnce(data) if err != io.ErrNoProgress { return err } } return io.ErrClosedPipe } func (this *Stream) TryWriteOnce(data *alloc.Buffer) error { this.access.RLock() defer this.access.RUnlock() if this.closed { return io.ErrClosedPipe } select { case this.buffer <- data: return nil case <-time.After(2 * time.Second): return io.ErrNoProgress } } func (this *Stream) Close() { if this.closed { return } this.access.Lock() defer this.access.Unlock() if this.closed { return } this.closed = true close(this.buffer) } func (this *Stream) Release() { if this.buffer == nil { return } this.Close() this.access.Lock() defer this.access.Unlock() if this.buffer == nil { return } for data := range this.buffer { data.Release() } this.buffer = nil }