v2fly/transport/pipe/impl.go

205 lines
3.4 KiB
Go
Raw Normal View History

2018-04-16 12:57:13 +00:00
package pipe
import (
2018-04-18 13:11:03 +00:00
"errors"
2018-04-16 12:57:13 +00:00
"io"
2018-11-11 17:58:58 +00:00
"runtime"
2018-04-16 12:57:13 +00:00
"sync"
"time"
2021-02-16 20:31:50 +00:00
"github.com/v2fly/v2ray-core/v4/common"
"github.com/v2fly/v2ray-core/v4/common/buf"
"github.com/v2fly/v2ray-core/v4/common/signal"
"github.com/v2fly/v2ray-core/v4/common/signal/done"
2018-04-16 12:57:13 +00:00
)
type state byte
const (
open state = iota
closed
errord
)
2018-11-14 11:31:59 +00:00
type pipeOption struct {
limit int32 // maximum buffer size in bytes
discardOverflow bool
}
func (o *pipeOption) isFull(curSize int32) bool {
return o.limit >= 0 && curSize > o.limit
}
2018-04-16 12:57:13 +00:00
type pipe struct {
sync.Mutex
2018-11-14 11:31:59 +00:00
data buf.MultiBuffer
readSignal *signal.Notifier
writeSignal *signal.Notifier
done *done.Instance
option pipeOption
state state
2018-04-16 12:57:13 +00:00
}
2021-05-19 21:28:52 +00:00
var (
errBufferFull = errors.New("buffer full")
errSlowDown = errors.New("slow down")
)
2018-04-18 13:11:03 +00:00
2018-04-16 12:57:13 +00:00
func (p *pipe) getState(forRead bool) error {
switch p.state {
case open:
2018-11-14 11:31:59 +00:00
if !forRead && p.option.isFull(p.data.Len()) {
2018-04-18 13:11:03 +00:00
return errBufferFull
}
2018-04-16 12:57:13 +00:00
return nil
case closed:
2018-04-23 14:26:29 +00:00
if !forRead {
return io.ErrClosedPipe
2018-04-16 12:57:13 +00:00
}
2018-04-23 14:26:29 +00:00
if !p.data.IsEmpty() {
return nil
}
return io.EOF
2018-04-16 12:57:13 +00:00
case errord:
return io.ErrClosedPipe
default:
panic("impossible case")
}
}
func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) {
p.Lock()
defer p.Unlock()
if err := p.getState(true); err != nil {
return nil, err
}
data := p.data
p.data = nil
return data, nil
}
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
for {
data, err := p.readMultiBufferInternal()
if data != nil || err != nil {
2018-04-16 22:45:38 +00:00
p.writeSignal.Signal()
2018-04-16 12:57:13 +00:00
return data, err
}
2018-06-09 02:47:37 +00:00
select {
case <-p.readSignal.Wait():
case <-p.done.Wait():
}
2018-04-16 12:57:13 +00:00
}
}
func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
2018-08-09 10:23:27 +00:00
timer := time.NewTimer(d)
defer timer.Stop()
2018-04-16 12:57:13 +00:00
for {
data, err := p.readMultiBufferInternal()
if data != nil || err != nil {
p.writeSignal.Signal()
return data, err
}
select {
case <-p.readSignal.Wait():
2018-06-09 02:47:37 +00:00
case <-p.done.Wait():
2018-08-09 10:23:27 +00:00
case <-timer.C:
2018-04-16 22:31:10 +00:00
return nil, buf.ErrReadTimeout
2018-04-16 12:57:13 +00:00
}
}
}
func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
p.Lock()
defer p.Unlock()
if err := p.getState(false); err != nil {
return err
}
2018-11-17 18:47:30 +00:00
if p.data == nil {
p.data = mb
return nil
2018-11-17 18:47:30 +00:00
}
p.data, _ = buf.MergeMulti(p.data, mb)
return errSlowDown
2018-04-16 12:57:13 +00:00
}
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
if mb.IsEmpty() {
return nil
}
for {
2018-04-18 13:11:03 +00:00
err := p.writeMultiBufferInternal(mb)
if err == nil {
p.readSignal.Signal()
return nil
}
if err == errSlowDown {
2018-08-31 10:35:08 +00:00
p.readSignal.Signal()
2018-11-11 17:58:58 +00:00
// Yield current goroutine. Hopefully the reading counterpart can pick up the payload.
runtime.Gosched()
2018-08-31 10:35:08 +00:00
return nil
}
if err == errBufferFull && p.option.discardOverflow {
2018-11-17 21:45:07 +00:00
buf.ReleaseMulti(mb)
2018-09-02 22:56:43 +00:00
return nil
}
if err != errBufferFull {
2018-11-17 21:45:07 +00:00
buf.ReleaseMulti(mb)
2018-04-18 13:11:03 +00:00
p.readSignal.Signal()
return err
2018-04-16 12:57:13 +00:00
}
2018-06-09 02:47:37 +00:00
select {
case <-p.writeSignal.Wait():
case <-p.done.Wait():
return io.ErrClosedPipe
}
2018-04-16 12:57:13 +00:00
}
}
func (p *pipe) Close() error {
p.Lock()
defer p.Unlock()
2018-04-16 22:31:10 +00:00
if p.state == closed || p.state == errord {
return nil
}
2018-04-16 12:57:13 +00:00
p.state = closed
2018-08-31 10:12:40 +00:00
common.Must(p.done.Close())
2018-04-16 12:57:13 +00:00
return nil
}
2018-12-31 20:25:10 +00:00
// Interrupt implements common.Interruptible.
func (p *pipe) Interrupt() {
2018-04-16 12:57:13 +00:00
p.Lock()
defer p.Unlock()
2018-04-23 14:26:29 +00:00
if p.state == closed || p.state == errord {
return
}
2018-04-16 12:57:13 +00:00
2018-04-23 14:42:37 +00:00
p.state = errord
2018-04-16 12:57:13 +00:00
if !p.data.IsEmpty() {
2018-11-17 21:45:07 +00:00
buf.ReleaseMulti(p.data)
2018-04-16 12:57:13 +00:00
p.data = nil
}
2018-08-31 10:12:40 +00:00
common.Must(p.done.Close())
2018-04-16 12:57:13 +00:00
}