1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-22 18:17:52 -05:00
v2fly/transport/pipe/impl.go

156 lines
2.4 KiB
Go
Raw Normal View History

2018-04-16 08:57:13 -04:00
package pipe
import (
2018-04-18 09:11:03 -04:00
"errors"
2018-04-16 08:57:13 -04:00
"io"
"sync"
"time"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/signal"
)
type state byte
const (
open state = iota
closed
errord
)
type pipe struct {
sync.Mutex
data buf.MultiBuffer
readSignal *signal.Notifier
writeSignal *signal.Notifier
limit int32
state state
}
2018-04-18 09:11:03 -04:00
var errBufferFull = errors.New("buffer full")
2018-04-16 08:57:13 -04:00
func (p *pipe) getState(forRead bool) error {
switch p.state {
case open:
2018-04-18 09:11:03 -04:00
if !forRead && p.limit >= 0 && p.data.Len() > p.limit {
return errBufferFull
}
2018-04-16 08:57:13 -04:00
return nil
case closed:
2018-04-23 10:26:29 -04:00
if !forRead {
return io.ErrClosedPipe
2018-04-16 08:57:13 -04:00
}
2018-04-23 10:26:29 -04:00
if !p.data.IsEmpty() {
return nil
}
return io.EOF
2018-04-16 08:57:13 -04:00
case errord:
return io.ErrClosedPipe
default:
panic("impossible case")
}
}
func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) {
p.Lock()
defer p.Unlock()
if err := p.getState(true); err != nil {
return nil, err
}
data := p.data
p.data = nil
return data, nil
}
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
for {
data, err := p.readMultiBufferInternal()
if data != nil || err != nil {
2018-04-16 18:45:38 -04:00
p.writeSignal.Signal()
2018-04-16 08:57:13 -04:00
return data, err
}
<-p.readSignal.Wait()
}
}
func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {
timer := time.After(d)
for {
data, err := p.readMultiBufferInternal()
if data != nil || err != nil {
p.writeSignal.Signal()
return data, err
}
select {
case <-p.readSignal.Wait():
case <-timer:
2018-04-16 18:31:10 -04:00
return nil, buf.ErrReadTimeout
2018-04-16 08:57:13 -04:00
}
}
}
func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
p.Lock()
defer p.Unlock()
if err := p.getState(false); err != nil {
return err
}
p.data.AppendMulti(mb)
return nil
}
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
if mb.IsEmpty() {
return nil
}
for {
2018-04-18 09:11:03 -04:00
err := p.writeMultiBufferInternal(mb)
if err == nil || err != errBufferFull {
p.readSignal.Signal()
return err
2018-04-16 08:57:13 -04:00
}
<-p.writeSignal.Wait()
}
}
func (p *pipe) Close() error {
p.Lock()
defer p.Unlock()
2018-04-16 18:31:10 -04:00
if p.state == closed || p.state == errord {
return nil
}
2018-04-16 08:57:13 -04:00
p.state = closed
p.readSignal.Signal()
p.writeSignal.Signal()
return nil
}
func (p *pipe) CloseError() {
p.Lock()
defer p.Unlock()
2018-04-23 10:26:29 -04:00
if p.state == closed || p.state == errord {
return
}
2018-04-16 08:57:13 -04:00
2018-04-23 10:42:37 -04:00
p.state = errord
2018-04-16 08:57:13 -04:00
if !p.data.IsEmpty() {
p.data.Release()
p.data = nil
}
p.readSignal.Signal()
p.writeSignal.Signal()
}