From 5d236047138beb60adfb6647f866ac2535012beb Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Tue, 17 Apr 2018 00:31:10 +0200 Subject: [PATCH] rewrite ray -> pipe --- app/commander/outbound.go | 13 +- app/dispatcher/default.go | 85 +++++-- app/proxyman/mux/mux.go | 81 +++--- app/proxyman/mux/session.go | 14 +- app/proxyman/outbound/handler.go | 25 +- common/buf/multi_buffer.go | 6 + {transport/ray => common/net}/connection.go | 95 ++++--- common/stats/io.go | 26 ++ dial.go | 3 +- network.go | 3 +- proxy/blackhole/blackhole.go | 9 +- proxy/dokodemo/dokodemo.go | 13 +- proxy/freedom/freedom.go | 9 +- proxy/http/server.go | 30 +-- proxy/proxy.go | 3 +- proxy/shadowsocks/client.go | 11 +- proxy/shadowsocks/server.go | 15 +- proxy/socks/client.go | 11 +- proxy/socks/server.go | 16 +- proxy/vmess/inbound/inbound.go | 19 +- proxy/vmess/outbound/outbound.go | 13 +- router.go | 12 +- transport/internet/udp/dispatcher.go | 24 +- transport/internet/udp/dispatcher_test.go | 19 +- transport/pipe/impl.go | 9 +- transport/pipe/pipe.go | 10 + transport/ray/direct.go | 268 -------------------- transport/ray/direct_test.go | 49 ---- transport/ray/ray.go | 54 ---- 29 files changed, 347 insertions(+), 598 deletions(-) rename {transport/ray => common/net}/connection.go (53%) create mode 100644 common/stats/io.go delete mode 100644 transport/ray/direct.go delete mode 100644 transport/ray/direct_test.go delete mode 100644 transport/ray/ray.go diff --git a/app/commander/outbound.go b/app/commander/outbound.go index 3a874921d..7b46f9791 100644 --- a/app/commander/outbound.go +++ b/app/commander/outbound.go @@ -2,12 +2,13 @@ package commander import ( "context" - "net" "sync" + "v2ray.com/core" "v2ray.com/core/common" + "v2ray.com/core/common/net" "v2ray.com/core/common/signal" - "v2ray.com/core/transport/ray" + "v2ray.com/core/transport/pipe" ) // OutboundListener is a net.Listener for listening gRPC connections. @@ -68,18 +69,18 @@ type Outbound struct { } // Dispatch implements core.OutboundHandler. -func (co *Outbound) Dispatch(ctx context.Context, r ray.OutboundRay) { +func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) { co.access.RLock() if co.closed { - r.OutboundInput().CloseError() - r.OutboundOutput().CloseError() + pipe.CloseError(link.Reader) + pipe.CloseError(link.Writer) co.access.RUnlock() return } closeSignal := signal.NewNotifier() - c := ray.NewConnection(r.OutboundInput(), r.OutboundOutput(), ray.ConnCloseSignal(closeSignal)) + c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(closeSignal)) co.listener.add(c) co.access.RUnlock() <-closeSignal.Wait() diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index 503d080b6..5ffea14f4 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -12,14 +12,45 @@ import ( "v2ray.com/core/common/buf" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/stats" "v2ray.com/core/proxy" - "v2ray.com/core/transport/ray" + "v2ray.com/core/transport/pipe" ) var ( errSniffingTimeout = newError("timeout on sniffing") ) +type cachedReader struct { + reader *pipe.Reader + cache buf.MultiBuffer +} + +func (r *cachedReader) Cache(b *buf.Buffer) { + mb, _ := r.reader.ReadMultiBufferWithTimeout(time.Millisecond * 100) + if !mb.IsEmpty() { + r.cache.WriteMultiBuffer(mb) + } + common.Must(b.Reset(func(x []byte) (int, error) { + return r.cache.Copy(x), nil + })) +} + +func (r *cachedReader) ReadMultiBuffer() (buf.MultiBuffer, error) { + if !r.cache.IsEmpty() { + mb := r.cache + r.cache = nil + return mb, nil + } + + return r.reader.ReadMultiBuffer() +} + +func (r *cachedReader) CloseError() { + r.cache.Release() + r.reader.CloseError() +} + // DefaultDispatcher is a default implementation of Dispatcher. type DefaultDispatcher struct { ohm core.OutboundHandlerManager @@ -52,45 +83,64 @@ func (*DefaultDispatcher) Start() error { // Close implements common.Closable. func (*DefaultDispatcher) Close() error { return nil } -func (d *DefaultDispatcher) getRayOption(user *protocol.User) []ray.Option { - var rayOptions []ray.Option +func (d *DefaultDispatcher) getLink(ctx context.Context) (*core.Link, *core.Link) { + uplinkReader, uplinkWriter := pipe.New() + downlinkReader, downlinkWriter := pipe.New() + inboundLink := &core.Link{ + Reader: downlinkReader, + Writer: uplinkWriter, + } + + outboundLink := &core.Link{ + Reader: uplinkReader, + Writer: downlinkWriter, + } + + user := protocol.UserFromContext(ctx) if user != nil && len(user.Email) > 0 { p := d.policy.ForLevel(user.Level) if p.Stats.UserUplink { name := "user>>>" + user.Email + ">>>traffic>>>uplink" if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil { - rayOptions = append(rayOptions, ray.WithUplinkStatCounter(c)) + inboundLink.Writer = &stats.SizeStatWriter{ + Counter: c, + Writer: inboundLink.Writer, + } } } if p.Stats.UserDownlink { name := "user>>>" + user.Email + ">>>traffic>>>downlink" if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil { - rayOptions = append(rayOptions, ray.WithDownlinkStatCounter(c)) + outboundLink.Writer = &stats.SizeStatWriter{ + Counter: c, + Writer: outboundLink.Writer, + } } } } - return rayOptions + return inboundLink, outboundLink } // Dispatch implements core.Dispatcher. -func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (ray.InboundRay, error) { +func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*core.Link, error) { if !destination.IsValid() { panic("Dispatcher: Invalid destination.") } ctx = proxy.ContextWithTarget(ctx, destination) - user := protocol.UserFromContext(ctx) - rayOptions := d.getRayOption(user) - - outbound := ray.New(ctx, rayOptions...) + inbound, outbound := d.getLink(ctx) snifferList := proxyman.ProtocolSniffersFromContext(ctx) if destination.Address.Family().IsDomain() || len(snifferList) == 0 { go d.routedDispatch(ctx, outbound, destination) } else { go func() { - domain, err := sniffer(ctx, snifferList, outbound) + cReader := &cachedReader{ + reader: outbound.Reader.(*pipe.Reader), + } + outbound.Reader = cReader + domain, err := sniffer(ctx, snifferList, cReader) if err == nil { newError("sniffed domain: ", domain).WithContext(ctx).WriteToLog() destination.Address = net.ParseAddress(domain) @@ -99,10 +149,10 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin d.routedDispatch(ctx, outbound, destination) }() } - return outbound, nil + return inbound, nil } -func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, outbound ray.OutboundRay) (string, error) { +func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, cReader *cachedReader) (string, error) { payload := buf.New() defer payload.Release() @@ -117,7 +167,8 @@ func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, outboun if totalAttempt > 5 { return "", errSniffingTimeout } - outbound.OutboundInput().Peek(payload) + + cReader.Cache(payload) if !payload.IsEmpty() { domain, err := sniffer.Sniff(payload.Bytes()) if err != ErrMoreData { @@ -132,7 +183,7 @@ func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, outboun } } -func (d *DefaultDispatcher) routedDispatch(ctx context.Context, outbound ray.OutboundRay, destination net.Destination) { +func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *core.Link, destination net.Destination) { dispatcher := d.ohm.GetDefaultHandler() if d.router != nil { if tag, err := d.router.PickRoute(ctx); err == nil { @@ -146,7 +197,7 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, outbound ray.Out newError("default route for ", destination).WithContext(ctx).WriteToLog() } } - dispatcher.Dispatch(ctx, outbound) + dispatcher.Dispatch(ctx, link) } func init() { diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index ce9bea438..f10a3c2ad 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -18,7 +18,7 @@ import ( "v2ray.com/core/common/protocol" "v2ray.com/core/common/signal" "v2ray.com/core/proxy" - "v2ray.com/core/transport/ray" + "v2ray.com/core/transport/pipe" ) const ( @@ -41,12 +41,12 @@ func NewClientManager(p proxy.Outbound, d proxy.Dialer, c *proxyman.Multiplexing } } -func (m *ClientManager) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) error { +func (m *ClientManager) Dispatch(ctx context.Context, link *core.Link) error { m.access.Lock() defer m.access.Unlock() for _, client := range m.clients { - if client.Dispatch(ctx, outboundRay) { + if client.Dispatch(ctx, link) { return nil } } @@ -56,7 +56,7 @@ func (m *ClientManager) Dispatch(ctx context.Context, outboundRay ray.OutboundRa return newError("failed to create client").Base(err) } m.clients = append(m.clients, client) - client.Dispatch(ctx, outboundRay) + client.Dispatch(ctx, link) return nil } @@ -76,7 +76,7 @@ func (m *ClientManager) onClientFinish() { type Client struct { sessionManager *SessionManager - inboundRay ray.InboundRay + link core.Link done *signal.Done manager *ClientManager concurrency uint32 @@ -89,18 +89,22 @@ var muxCoolPort = net.Port(9527) func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) { ctx := proxy.ContextWithTarget(context.Background(), net.TCPDestination(muxCoolAddress, muxCoolPort)) ctx, cancel := context.WithCancel(ctx) - pipe := ray.New(ctx) + uplinkReader, upLinkWriter := pipe.New() + downlinkReader, downlinkWriter := pipe.New() c := &Client{ sessionManager: NewSessionManager(), - inboundRay: pipe, - done: signal.NewDone(), - manager: m, - concurrency: m.config.Concurrency, + link: core.Link{ + Reader: downlinkReader, + Writer: upLinkWriter, + }, + done: signal.NewDone(), + manager: m, + concurrency: m.config.Concurrency, } go func() { - if err := p.Process(ctx, pipe, dialer); err != nil { + if err := p.Process(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil { errors.New("failed to handler mux client connection").Base(err).WriteToLog() } c.done.Close() @@ -127,8 +131,8 @@ func (m *Client) monitor() { select { case <-m.done.Wait(): m.sessionManager.Close() - m.inboundRay.InboundInput().Close() - m.inboundRay.InboundOutput().CloseError() + common.Close(m.link.Writer) + pipe.CloseError(m.link.Reader) return case <-timer.C: size := m.sessionManager.Size() @@ -159,7 +163,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { writer.Close() } -func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool { +func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool { sm := m.sessionManager if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal { return false @@ -173,9 +177,9 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool if s == nil { return false } - s.input = outboundRay.OutboundInput() - s.output = outboundRay.OutboundOutput() - go fetchInput(ctx, s, m.inboundRay.InboundInput()) + s.input = link.Reader + s.output = link.Writer + go fetchInput(ctx, s, m.link.Writer) return true } @@ -205,7 +209,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade if s, found := m.sessionManager.Get(meta.SessionID); found { if err := buf.Copy(s.NewReader(reader), s.output); err != nil { drain(reader) - s.input.CloseError() + pipe.CloseError(s.input) return s.Close() } return nil @@ -216,8 +220,8 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error { if s, found := m.sessionManager.Get(meta.SessionID); found { if meta.Option.Has(OptionError) { - s.input.CloseError() - s.output.CloseError() + pipe.CloseError(s.input) + pipe.CloseError(s.output) } s.Close() } @@ -230,7 +234,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader func (m *Client) fetchOutput() { defer m.done.Close() - reader := buf.NewBufferedReader(m.inboundRay.InboundOutput()) + reader := buf.NewBufferedReader(m.link.Reader) for { meta, err := ReadMetadata(reader) @@ -274,19 +278,24 @@ func NewServer(ctx context.Context) *Server { return s } -func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) { +func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) { if dest.Address != muxCoolAddress { return s.dispatcher.Dispatch(ctx, dest) } - ray := ray.New(ctx) + uplinkReader, uplinkWriter := pipe.New() + downlinkReader, downlinkWriter := pipe.New() + worker := &ServerWorker{ - dispatcher: s.dispatcher, - outboundRay: ray, + dispatcher: s.dispatcher, + link: &core.Link{ + Reader: uplinkReader, + Writer: downlinkWriter, + }, sessionManager: NewSessionManager(), } go worker.run(ctx) - return ray, nil + return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil } func (s *Server) Start() error { @@ -299,7 +308,7 @@ func (s *Server) Close() error { type ServerWorker struct { dispatcher core.Dispatcher - outboundRay ray.OutboundRay + link *core.Link sessionManager *SessionManager } @@ -334,7 +343,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, } log.Record(msg) } - inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target) + link, err := w.dispatcher.Dispatch(ctx, meta.Target) if err != nil { if meta.Option.Has(OptionData) { drain(reader) @@ -342,8 +351,8 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, return newError("failed to dispatch request.").Base(err) } s := &Session{ - input: inboundRay.InboundOutput(), - output: inboundRay.InboundInput(), + input: link.Reader, + output: link.Writer, parent: w.sessionManager, ID: meta.SessionID, transferType: protocol.TransferTypeStream, @@ -352,7 +361,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, s.transferType = protocol.TransferTypePacket } w.sessionManager.Add(s) - go handle(ctx, s, w.outboundRay.OutboundOutput()) + go handle(ctx, s, w.link.Writer) if meta.Option.Has(OptionData) { return buf.Copy(s.NewReader(reader), s.output, buf.IgnoreWriterError()) } @@ -366,7 +375,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere if s, found := w.sessionManager.Get(meta.SessionID); found { if err := buf.Copy(s.NewReader(reader), s.output); err != nil { drain(reader) - s.input.CloseError() + pipe.CloseError(s.input) return s.Close() } return nil @@ -377,8 +386,8 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error { if s, found := w.sessionManager.Get(meta.SessionID); found { if meta.Option.Has(OptionError) { - s.input.CloseError() - s.output.CloseError() + pipe.CloseError(s.input) + pipe.CloseError(s.output) } s.Close() } @@ -414,7 +423,7 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead } func (w *ServerWorker) run(ctx context.Context) { - input := w.outboundRay.OutboundInput() + input := w.link.Reader reader := buf.NewBufferedReader(input) defer w.sessionManager.Close() @@ -428,7 +437,7 @@ func (w *ServerWorker) run(ctx context.Context) { if err != nil { if errors.Cause(err) != io.EOF { newError("unexpected EOF").Base(err).WithContext(ctx).WriteToLog() - input.CloseError() + pipe.CloseError(input) } return } diff --git a/app/proxyman/mux/session.go b/app/proxyman/mux/session.go index d42814c97..6dcccc1b7 100644 --- a/app/proxyman/mux/session.go +++ b/app/proxyman/mux/session.go @@ -3,9 +3,9 @@ package mux import ( "sync" + "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/protocol" - "v2ray.com/core/transport/ray" ) type SessionManager struct { @@ -118,8 +118,8 @@ func (m *SessionManager) Close() error { m.closed = true for _, s := range m.sessions { - s.input.Close() - s.output.Close() + common.Close(s.input) + common.Close(s.output) } m.sessions = nil @@ -128,8 +128,8 @@ func (m *SessionManager) Close() error { // Session represents a client connection in a Mux connection. type Session struct { - input ray.InputStream - output ray.OutputStream + input buf.Reader + output buf.Writer parent *SessionManager ID uint16 transferType protocol.TransferType @@ -137,8 +137,8 @@ type Session struct { // Close closes all resources associated with this session. func (s *Session) Close() error { - s.output.Close() - s.input.Close() + common.Close(s.output) + common.Close(s.input) s.parent.Remove(s.ID) return nil } diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index 55205d401..b464fe6d6 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -10,7 +10,7 @@ import ( "v2ray.com/core/common/net" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" + "v2ray.com/core/transport/pipe" ) type Handler struct { @@ -74,21 +74,21 @@ func (h *Handler) Tag() string { } // Dispatch implements proxy.Outbound.Dispatch. -func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) { +func (h *Handler) Dispatch(ctx context.Context, link *core.Link) { if h.mux != nil { - if err := h.mux.Dispatch(ctx, outboundRay); err != nil { + if err := h.mux.Dispatch(ctx, link); err != nil { newError("failed to process outbound traffic").Base(err).WithContext(ctx).WriteToLog() - outboundRay.OutboundOutput().CloseError() + pipe.CloseError(link.Writer) } } else { - if err := h.proxy.Process(ctx, outboundRay, h); err != nil { + if err := h.proxy.Process(ctx, link, h); err != nil { // Ensure outbound ray is properly closed. newError("failed to process outbound traffic").Base(err).WithContext(ctx).WriteToLog() - outboundRay.OutboundOutput().CloseError() + pipe.CloseError(link.Writer) } else { - outboundRay.OutboundOutput().Close() + common.Close(link.Writer) } - outboundRay.OutboundInput().CloseError() + pipe.CloseError(link.Reader) } } @@ -101,9 +101,12 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn if handler != nil { newError("proxying to ", tag, " for dest ", dest).AtDebug().WithContext(ctx).WriteToLog() ctx = proxy.ContextWithTarget(ctx, dest) - stream := ray.New(ctx) - go handler.Dispatch(ctx, stream) - return ray.NewConnection(stream.InboundOutput(), stream.InboundInput()), nil + + uplinkReader, uplinkWriter := pipe.New() + downlinkReader, downlinkWriter := pipe.New() + + go handler.Dispatch(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}) + return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil } newError("failed to get outbound handler with tag: ", tag).AtWarning().WithContext(ctx).WriteToLog() diff --git a/common/buf/multi_buffer.go b/common/buf/multi_buffer.go index 89fd05c74..9198b8a24 100644 --- a/common/buf/multi_buffer.go +++ b/common/buf/multi_buffer.go @@ -172,6 +172,12 @@ func (mb *MultiBuffer) Write(b []byte) (int, error) { return totalBytes, nil } +// WriteMultiBuffer implements Writer. +func (mb *MultiBuffer) WriteMultiBuffer(b MultiBuffer) error { + *mb = append(*mb, b...) + return nil +} + // Len returns the total number of bytes in the MultiBuffer. func (mb MultiBuffer) Len() int32 { size := int32(0) diff --git a/transport/ray/connection.go b/common/net/connection.go similarity index 53% rename from transport/ray/connection.go rename to common/net/connection.go index 27753ac8c..12998fbe8 100644 --- a/transport/ray/connection.go +++ b/common/net/connection.go @@ -1,69 +1,81 @@ -package ray +package net import ( "io" "net" "time" + "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/signal" ) type ConnectionOption func(*connection) -func ConnLocalAddr(addr net.Addr) ConnectionOption { +func ConnectionLocalAddr(a net.Addr) ConnectionOption { return func(c *connection) { - c.localAddr = addr + c.local = a } } -func ConnRemoteAddr(addr net.Addr) ConnectionOption { +func ConnectionRemoteAddr(a net.Addr) ConnectionOption { return func(c *connection) { - c.remoteAddr = addr + c.remote = a } } -func ConnCloseSignal(s *signal.Notifier) ConnectionOption { +func ConnectionInput(writer io.Writer) ConnectionOption { return func(c *connection) { - c.closeSignal = s + c.writer = buf.NewWriter(writer) } } -type connection struct { - input InputStream - output OutputStream - closed bool - localAddr net.Addr - remoteAddr net.Addr - closeSignal *signal.Notifier - - reader *buf.BufferedReader +func ConnectionInputMulti(writer buf.Writer) ConnectionOption { + return func(c *connection) { + c.writer = writer + } } -var zeroAddr net.Addr = &net.TCPAddr{IP: []byte{0, 0, 0, 0}} +func ConnectionOutput(reader io.Reader) ConnectionOption { + return func(c *connection) { + c.reader = buf.NewBufferedReader(buf.NewReader(reader)) + } +} -// NewConnection wraps a Ray into net.Conn. -func NewConnection(input InputStream, output OutputStream, options ...ConnectionOption) net.Conn { +func ConnectionOutputMulti(reader buf.Reader) ConnectionOption { + return func(c *connection) { + c.reader = buf.NewBufferedReader(reader) + } +} + +func ConnectionOnClose(s *signal.Notifier) ConnectionOption { + return func(c *connection) { + c.onClose = s + } +} + +func NewConnection(opts ...ConnectionOption) net.Conn { c := &connection{ - input: input, - output: output, - localAddr: zeroAddr, - remoteAddr: zeroAddr, - reader: buf.NewBufferedReader(input), + done: signal.NewDone(), } - for _, opt := range options { + for _, opt := range opts { opt(c) } return c } -// Read implements net.Conn.Read(). +type connection struct { + reader *buf.BufferedReader + writer buf.Writer + done *signal.Done + onClose *signal.Notifier + local Addr + remote Addr +} + func (c *connection) Read(b []byte) (int, error) { - if c.closed { - return 0, io.EOF - } return c.reader.Read(b) } @@ -74,43 +86,44 @@ func (c *connection) ReadMultiBuffer() (buf.MultiBuffer, error) { // Write implements net.Conn.Write(). func (c *connection) Write(b []byte) (int, error) { - if c.closed { + if c.done.Done() { return 0, io.ErrClosedPipe } l := len(b) mb := buf.NewMultiBufferCap(int32(l)/buf.Size + 1) - mb.Write(b) - return l, c.output.WriteMultiBuffer(mb) + common.Must2(mb.Write(b)) + return l, c.writer.WriteMultiBuffer(mb) } func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error { - if c.closed { + if c.done.Done() { return io.ErrClosedPipe } - return c.output.WriteMultiBuffer(mb) + return c.writer.WriteMultiBuffer(mb) } // Close implements net.Conn.Close(). func (c *connection) Close() error { - c.closed = true - c.output.Close() - c.input.CloseError() - if c.closeSignal != nil { - c.closeSignal.Signal() + common.Must(c.done.Close()) + common.Close(c.reader) + common.Close(c.writer) + if c.onClose != nil { + c.onClose.Signal() } + return nil } // LocalAddr implements net.Conn.LocalAddr(). func (c *connection) LocalAddr() net.Addr { - return c.localAddr + return c.local } // RemoteAddr implements net.Conn.RemoteAddr(). func (c *connection) RemoteAddr() net.Addr { - return c.remoteAddr + return c.remote } // SetDeadline implements net.Conn.SetDeadline(). diff --git a/common/stats/io.go b/common/stats/io.go new file mode 100644 index 000000000..4d92f8b6a --- /dev/null +++ b/common/stats/io.go @@ -0,0 +1,26 @@ +package stats + +import ( + "v2ray.com/core" + "v2ray.com/core/common" + "v2ray.com/core/common/buf" + "v2ray.com/core/transport/pipe" +) + +type SizeStatWriter struct { + Counter core.StatCounter + Writer buf.Writer +} + +func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { + w.Counter.Add(int64(mb.Len())) + return w.Writer.WriteMultiBuffer(mb) +} + +func (w *SizeStatWriter) Close() error { + return common.Close(w.Writer) +} + +func (w *SizeStatWriter) CloseError() { + pipe.CloseError(w.Writer) +} diff --git a/dial.go b/dial.go index 1819bcafd..2e60ef468 100644 --- a/dial.go +++ b/dial.go @@ -4,7 +4,6 @@ import ( "context" "v2ray.com/core/common/net" - "v2ray.com/core/transport/ray" ) // Dial provides an easy way for upstream caller to create net.Conn through V2Ray. @@ -16,5 +15,5 @@ func Dial(ctx context.Context, v *Instance, dest net.Destination) (net.Conn, err if err != nil { return nil, err } - return ray.NewConnection(r.InboundOutput(), r.InboundInput()), nil + return net.NewConnection(net.ConnectionInputMulti(r.Writer), net.ConnectionOutputMulti(r.Reader)), nil } diff --git a/network.go b/network.go index e6a0046cb..a164e54ac 100644 --- a/network.go +++ b/network.go @@ -6,7 +6,6 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/net" - "v2ray.com/core/transport/ray" ) // InboundHandler is the interface for handlers that process inbound connections. @@ -23,7 +22,7 @@ type InboundHandler interface { type OutboundHandler interface { common.Runnable Tag() string - Dispatch(ctx context.Context, outboundRay ray.OutboundRay) + Dispatch(ctx context.Context, link *Link) } // InboundHandlerManager is a feature that manages InboundHandlers. diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index da98efd12..e3e26410c 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -7,9 +7,10 @@ import ( "context" "time" + "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/proxy" - "v2ray.com/core/transport/ray" + "v2ray.com/core/transport/pipe" ) // Handler is an outbound connection that silently swallow the entire payload. @@ -29,11 +30,11 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } // Process implements OutboundHandler.Dispatch(). -func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { - h.response.WriteTo(outboundRay.OutboundOutput()) +func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { + h.response.WriteTo(link.Writer) // Sleep a little here to make sure the response is sent to client. time.Sleep(time.Second) - outboundRay.OutboundOutput().CloseError() + pipe.CloseError(link.Writer) return nil } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index b78902a30..c8d296927 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -14,6 +14,7 @@ import ( "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/udp" + "v2ray.com/core/transport/pipe" ) type DokodemoDoor struct { @@ -70,18 +71,18 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, d.policy().Timeouts.ConnectionIdle) - inboundRay, err := dispatcher.Dispatch(ctx, dest) + link, err := dispatcher.Dispatch(ctx, dest) if err != nil { return newError("failed to dispatch request").Base(err) } requestDone := func() error { - defer inboundRay.InboundInput().Close() + defer common.Close(link.Writer) defer timer.SetTimeout(d.policy().Timeouts.DownlinkOnly) chunkReader := buf.NewReader(conn) - if err := buf.Copy(chunkReader, inboundRay.InboundInput(), buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(chunkReader, link.Writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport request").Base(err) } @@ -108,7 +109,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in } } - if err := buf.Copy(inboundRay.InboundOutput(), writer, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(link.Reader, writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport response").Base(err) } @@ -116,8 +117,8 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { - inboundRay.InboundInput().CloseError() - inboundRay.InboundOutput().CloseError() + pipe.CloseError(link.Reader) + pipe.CloseError(link.Writer) return newError("connection ends").Base(err) } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 466716f31..4f9b498a0 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -15,7 +15,6 @@ import ( "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" ) // Handler handles Freedom connections. @@ -65,7 +64,7 @@ func (h *Handler) resolveIP(ctx context.Context, domain string) net.Address { } // Process implements proxy.Outbound. -func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { +func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { destination, _ := proxy.TargetFromContext(ctx) if h.config.DestinationOverride != nil { server := h.config.DestinationOverride.Server @@ -77,8 +76,8 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } newError("opening connection to ", destination).WithContext(ctx).WriteToLog() - input := outboundRay.OutboundInput() - output := outboundRay.OutboundOutput() + input := link.Reader + output := link.Writer if h.config.DomainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() { ip := h.resolveIP(ctx, destination.Address.Domain()) @@ -137,8 +136,6 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { - input.CloseError() - output.CloseError() return newError("connection ends").Base(err) } diff --git a/proxy/http/server.go b/proxy/http/server.go index bbe2dc60d..9f2b6839d 100755 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "v2ray.com/core/transport/pipe" + "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" @@ -166,7 +168,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle) - ray, err := dispatcher.Dispatch(ctx, dest) + link, err := dispatcher.Dispatch(ctx, dest) if err != nil { return err } @@ -176,25 +178,25 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade if err != nil { return err } - if err := ray.InboundInput().WriteMultiBuffer(payload); err != nil { + if err := link.Writer.WriteMultiBuffer(payload); err != nil { return err } reader = nil } requestDone := func() error { - defer ray.InboundInput().Close() + defer common.Close(link.Writer) defer timer.SetTimeout(s.policy().Timeouts.DownlinkOnly) v2reader := buf.NewReader(conn) - return buf.Copy(v2reader, ray.InboundInput(), buf.UpdateActivity(timer)) + return buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer)) } responseDone := func() error { defer timer.SetTimeout(s.policy().Timeouts.UplinkOnly) v2writer := buf.NewWriter(conn) - if err := buf.Copy(ray.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(link.Reader, v2writer, buf.UpdateActivity(timer)); err != nil { return err } @@ -202,8 +204,8 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { - ray.InboundInput().CloseError() - ray.InboundOutput().CloseError() + pipe.CloseError(link.Reader) + pipe.CloseError(link.Writer) return newError("connection ends").Base(err) } @@ -241,20 +243,18 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri request.Header.Set("User-Agent", "") } - ray, err := dispatcher.Dispatch(ctx, dest) + link, err := dispatcher.Dispatch(ctx, dest) if err != nil { return err } - input := ray.InboundInput() - output := ray.InboundOutput() - defer input.Close() var result error = errWaitAnother requestDone := func() error { + defer common.Close(link.Writer) request.Header.Set("Connection", "close") - requestWriter := buf.NewBufferedWriter(ray.InboundInput()) + requestWriter := buf.NewBufferedWriter(link.Writer) common.Must(requestWriter.SetBuffered(false)) if err := request.Write(requestWriter); err != nil { return newError("failed to write whole request").Base(err).AtWarning() @@ -263,7 +263,7 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri } responseDone := func() error { - responseReader := bufio.NewReaderSize(buf.NewBufferedReader(ray.InboundOutput()), buf.Size) + responseReader := bufio.NewReaderSize(buf.NewBufferedReader(link.Reader), buf.Size) response, err := http.ReadResponse(responseReader, request) if err == nil { http_proto.RemoveHopByHopHeaders(response.Header) @@ -299,8 +299,8 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { - input.CloseError() - output.CloseError() + pipe.CloseError(link.Reader) + pipe.CloseError(link.Writer) return newError("connection ends").Base(err) } diff --git a/proxy/proxy.go b/proxy/proxy.go index a0d0ac422..fbd48a9cc 100755 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -12,7 +12,6 @@ import ( "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" ) // An Inbound processes inbound connections. @@ -27,7 +26,7 @@ type Inbound interface { // An Outbound process outbound connections. type Outbound interface { // Process processes the given connection. The given dialer may be used to dial a system outbound connection. - Process(context.Context, ray.OutboundRay, Dialer) error + Process(context.Context, *core.Link, Dialer) error } // Dialer is used by OutboundHandler for creating outbound connections. diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 05f2ad1e0..57978cfca 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -12,7 +12,6 @@ import ( "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" ) // Client is a inbound handler for Shadowsocks protocol @@ -38,7 +37,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { } // Process implements OutboundHandler.Process(). -func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { +func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { destination, ok := proxy.TargetFromContext(ctx) if !ok { return newError("target not specified") @@ -107,7 +106,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale requestDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) - return buf.Copy(outboundRay.OutboundInput(), bodyWriter, buf.UpdateActivity(timer)) + return buf.Copy(link.Reader, bodyWriter, buf.UpdateActivity(timer)) } responseDone := func() error { @@ -118,7 +117,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale return err } - return buf.Copy(responseReader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)) + return buf.Copy(responseReader, link.Writer, buf.UpdateActivity(timer)) } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { @@ -138,7 +137,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale requestDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) - if err := buf.Copy(outboundRay.OutboundInput(), writer, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(link.Reader, writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all UDP request").Base(err) } return nil @@ -152,7 +151,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale User: user, } - if err := buf.Copy(reader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all UDP response").Base(err) } return nil diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index a93ab8632..96ac7eafe 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -14,6 +14,7 @@ import ( "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/udp" + "v2ray.com/core/transport/pipe" ) type Server struct { @@ -167,7 +168,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) - ray, err := dispatcher.Dispatch(ctx, dest) + link, err := dispatcher.Dispatch(ctx, dest) if err != nil { return err } @@ -182,7 +183,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, } { - payload, err := ray.InboundOutput().ReadMultiBuffer() + payload, err := link.Reader.ReadMultiBuffer() if err != nil { return err } @@ -195,7 +196,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, return err } - if err := buf.Copy(ray.InboundOutput(), responseWriter, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(link.Reader, responseWriter, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP response").Base(err) } @@ -204,9 +205,9 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, requestDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) - defer ray.InboundInput().Close() + defer common.Close(link.Writer) - if err := buf.Copy(bodyReader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(bodyReader, link.Writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP request").Base(err) } @@ -214,8 +215,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { - ray.InboundInput().CloseError() - ray.InboundOutput().CloseError() + pipe.CloseError(link.Reader) + pipe.CloseError(link.Writer) return newError("connection ends").Base(err) } diff --git a/proxy/socks/client.go b/proxy/socks/client.go index 9abd15f61..e7fa5afdb 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -13,7 +13,6 @@ import ( "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" ) // Client is a Socks5 client. @@ -40,7 +39,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { } // Process implements proxy.Outbound.Process. -func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.Dialer) error { +func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { destination, ok := proxy.TargetFromContext(ctx) if !ok { return newError("target not specified.") @@ -107,11 +106,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. if request.Command == protocol.RequestCommandTCP { requestFunc = func() error { defer timer.SetTimeout(p.Timeouts.DownlinkOnly) - return buf.Copy(ray.OutboundInput(), buf.NewWriter(conn), buf.UpdateActivity(timer)) + return buf.Copy(link.Reader, buf.NewWriter(conn), buf.UpdateActivity(timer)) } responseFunc = func() error { defer timer.SetTimeout(p.Timeouts.UplinkOnly) - return buf.Copy(buf.NewReader(conn), ray.OutboundOutput(), buf.UpdateActivity(timer)) + return buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer)) } } else if request.Command == protocol.RequestCommandUDP { udpConn, err := dialer.Dial(ctx, udpRequest.Destination()) @@ -121,12 +120,12 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. defer udpConn.Close() requestFunc = func() error { defer timer.SetTimeout(p.Timeouts.DownlinkOnly) - return buf.Copy(ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer)) + return buf.Copy(link.Reader, buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer)) } responseFunc = func() error { defer timer.SetTimeout(p.Timeouts.UplinkOnly) reader := &UDPReader{reader: udpConn} - return buf.Copy(reader, ray.OutboundOutput(), buf.UpdateActivity(timer)) + return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)) } } diff --git a/proxy/socks/server.go b/proxy/socks/server.go index de3f0b0e4..d1a9308b2 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -15,6 +15,7 @@ import ( "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/udp" + "v2ray.com/core/transport/pipe" ) // Server is a SOCKS 5 proxy server @@ -129,20 +130,17 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle) - ray, err := dispatcher.Dispatch(ctx, dest) + link, err := dispatcher.Dispatch(ctx, dest) if err != nil { return err } - input := ray.InboundInput() - output := ray.InboundOutput() - requestDone := func() error { defer timer.SetTimeout(s.policy().Timeouts.DownlinkOnly) - defer input.Close() + defer common.Close(link.Writer) v2reader := buf.NewReader(reader) - if err := buf.Copy(v2reader, input, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP request").Base(err) } @@ -153,7 +151,7 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ defer timer.SetTimeout(s.policy().Timeouts.UplinkOnly) v2writer := buf.NewWriter(writer) - if err := buf.Copy(output, v2writer, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(link.Reader, v2writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP response").Base(err) } @@ -161,8 +159,8 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { - input.CloseError() - output.CloseError() + pipe.CloseError(link.Reader) + pipe.CloseError(link.Writer) return newError("connection ends").Base(err) } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 752fe6114..c88ee2dae 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -22,7 +22,7 @@ import ( "v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" + "v2ray.com/core/transport/pipe" ) type userByEmail struct { @@ -167,8 +167,8 @@ func (h *Handler) RemoveUser(ctx context.Context, email string) error { return nil } -func transferRequest(timer signal.ActivityUpdater, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error { - defer output.Close() +func transferRequest(timer signal.ActivityUpdater, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output buf.Writer) error { + defer common.Close(output) bodyReader := session.DecodeRequestBody(request, input) if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil { @@ -272,17 +272,14 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) - ray, err := dispatcher.Dispatch(ctx, request.Destination()) + link, err := dispatcher.Dispatch(ctx, request.Destination()) if err != nil { return newError("failed to dispatch request to ", request.Destination()).Base(err) } - input := ray.InboundInput() - output := ray.InboundOutput() - requestDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) - return transferRequest(timer, session, request, reader, input) + return transferRequest(timer, session, request, reader, link.Writer) } responseDone := func() error { @@ -293,12 +290,12 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i response := &protocol.ResponseHeader{ Command: h.generateCommand(ctx, request), } - return transferResponse(timer, session, request, response, output, writer) + return transferResponse(timer, session, request, response, link.Reader, writer) } if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil { - input.CloseError() - output.CloseError() + pipe.CloseError(link.Reader) + pipe.CloseError(link.Writer) return newError("connection ends").Base(err) } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 54f86588a..6e42a1d07 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -6,6 +6,8 @@ import ( "context" "time" + "v2ray.com/core/transport/pipe" + "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" @@ -17,7 +19,6 @@ import ( "v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" ) // Handler is an outbound connection handler for VMess protocol. @@ -42,7 +43,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } // Process implements proxy.Outbound.Process(). -func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error { +func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { var rec *protocol.ServerSpec var conn internet.Connection @@ -95,8 +96,8 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial request.Option.Set(protocol.RequestOptionChunkMasking) } - input := outboundRay.OutboundInput() - output := outboundRay.OutboundOutput() + input := link.Reader + output := link.Writer session := encoding.NewClientSession(protocol.DefaultIDHash) sessionPolicy := v.v.PolicyManager().ForLevel(request.User.Level) @@ -113,8 +114,8 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial } bodyWriter := session.EncodeRequestBody(request, writer) - { - firstPayload, err := input.ReadTimeout(time.Millisecond * 500) + if tReader, ok := input.(*pipe.Reader); ok { + firstPayload, err := tReader.ReadMultiBufferWithTimeout(time.Millisecond * 500) if err != nil && err != buf.ErrReadTimeout { return newError("failed to get first payload").Base(err) } diff --git a/router.go b/router.go index 4ea16cd4f..7913deea7 100644 --- a/router.go +++ b/router.go @@ -5,18 +5,24 @@ import ( "sync" "v2ray.com/core/common" + "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" "v2ray.com/core/common/net" - "v2ray.com/core/transport/ray" ) +// Link is a utility for connecting between an inbound and an outbound proxy handler. +type Link struct { + Reader buf.Reader + Writer buf.Writer +} + // Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules. // Dispatcher is required to be registered in a V2Ray instance to make V2Ray function properly. type Dispatcher interface { Feature // Dispatch returns a Ray for transporting data for the given request. - Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) + Dispatch(ctx context.Context, dest net.Destination) (*Link, error) } type syncDispatcher struct { @@ -24,7 +30,7 @@ type syncDispatcher struct { Dispatcher } -func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) { +func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*Link, error) { d.RLock() defer d.RUnlock() diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index 5fd3bf824..792e92310 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -6,18 +6,18 @@ import ( "time" "v2ray.com/core" + "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" "v2ray.com/core/common/signal" - "v2ray.com/core/transport/ray" ) type ResponseCallback func(payload *buf.Buffer) type connEntry struct { - inbound ray.InboundRay - timer signal.ActivityUpdater - cancel context.CancelFunc + link *core.Link + timer signal.ActivityUpdater + cancel context.CancelFunc } type Dispatcher struct { @@ -37,8 +37,8 @@ func (v *Dispatcher) RemoveRay(dest net.Destination) { v.Lock() defer v.Unlock() if conn, found := v.conns[dest]; found { - conn.inbound.InboundInput().Close() - conn.inbound.InboundOutput().Close() + common.Close(conn.link.Reader) + common.Close(conn.link.Writer) delete(v.conns, dest) } } @@ -59,11 +59,11 @@ func (v *Dispatcher) getInboundRay(dest net.Destination, callback ResponseCallba v.RemoveRay(dest) } timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4) - inboundRay, _ := v.dispatcher.Dispatch(ctx, dest) + link, _ := v.dispatcher.Dispatch(ctx, dest) entry := &connEntry{ - inbound: inboundRay, - timer: timer, - cancel: removeRay, + link: link, + timer: timer, + cancel: removeRay, } v.conns[dest] = entry go handleInput(ctx, entry, callback) @@ -75,7 +75,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, newError("dispatch request to: ", destination).AtDebug().WithContext(ctx).WriteToLog() conn := v.getInboundRay(destination, callback) - outputStream := conn.inbound.InboundInput() + outputStream := conn.link.Writer if outputStream != nil { if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil { newError("failed to write first UDP payload").Base(err).WithContext(ctx).WriteToLog() @@ -86,7 +86,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, } func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) { - input := conn.inbound.InboundOutput() + input := conn.link.Reader timer := conn.timer for { diff --git a/transport/internet/udp/dispatcher_test.go b/transport/internet/udp/dispatcher_test.go index 22f4a4277..4d3323429 100644 --- a/transport/internet/udp/dispatcher_test.go +++ b/transport/internet/udp/dispatcher_test.go @@ -6,18 +6,19 @@ import ( "testing" "time" + "v2ray.com/core" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" . "v2ray.com/core/transport/internet/udp" - "v2ray.com/core/transport/ray" + "v2ray.com/core/transport/pipe" . "v2ray.com/ext/assert" ) type TestDispatcher struct { - OnDispatch func(ctx context.Context, dest net.Destination) (ray.InboundRay, error) + OnDispatch func(ctx context.Context, dest net.Destination) (*core.Link, error) } -func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) { +func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) { return d.OnDispatch(ctx, dest) } @@ -33,23 +34,25 @@ func TestSameDestinationDispatching(t *testing.T) { assert := With(t) ctx, cancel := context.WithCancel(context.Background()) - link := ray.New(ctx) + uplinkReader, uplinkWriter := pipe.New() + downlinkReader, downlinkWriter := pipe.New() + go func() { for { - data, err := link.OutboundInput().ReadMultiBuffer() + data, err := uplinkReader.ReadMultiBuffer() if err != nil { break } - err = link.OutboundOutput().WriteMultiBuffer(data) + err = downlinkWriter.WriteMultiBuffer(data) assert(err, IsNil) } }() var count uint32 td := &TestDispatcher{ - OnDispatch: func(ctx context.Context, dest net.Destination) (ray.InboundRay, error) { + OnDispatch: func(ctx context.Context, dest net.Destination) (*core.Link, error) { atomic.AddUint32(&count, 1) - return link, nil + return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil }, } dest := net.UDPDestination(net.LocalHostIP, 53) diff --git a/transport/pipe/impl.go b/transport/pipe/impl.go index f1f8e8f69..6439e3efe 100644 --- a/transport/pipe/impl.go +++ b/transport/pipe/impl.go @@ -6,7 +6,6 @@ import ( "time" "v2ray.com/core/common/buf" - "v2ray.com/core/common/errors" "v2ray.com/core/common/signal" ) @@ -70,8 +69,6 @@ func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) { } } -var ErrTimeout = errors.New("Timeout on reading pipeline.") - func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) { timer := time.After(d) for { @@ -84,7 +81,7 @@ func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, err select { case <-p.readSignal.Wait(): case <-timer: - return nil, ErrTimeout + return nil, buf.ErrReadTimeout } } } @@ -120,6 +117,10 @@ func (p *pipe) Close() error { p.Lock() defer p.Unlock() + if p.state == closed || p.state == errord { + return nil + } + p.state = closed p.readSignal.Signal() p.writeSignal.Signal() diff --git a/transport/pipe/pipe.go b/transport/pipe/pipe.go index e3dc238d7..a53e8b8d9 100644 --- a/transport/pipe/pipe.go +++ b/transport/pipe/pipe.go @@ -37,6 +37,16 @@ func New(opts ...Option) (*Reader, *Writer) { } } +type closeError interface { + CloseError() +} + +func CloseError(v interface{}) { + if c, ok := v.(closeError); ok { + c.CloseError() + } +} + var defaultLimit int32 = 10 * 1024 * 1024 func init() { diff --git a/transport/ray/direct.go b/transport/ray/direct.go deleted file mode 100644 index c84b0b227..000000000 --- a/transport/ray/direct.go +++ /dev/null @@ -1,268 +0,0 @@ -package ray - -import ( - "context" - "io" - "sync" - "time" - - "v2ray.com/core/common" - "v2ray.com/core/common/buf" - "v2ray.com/core/common/platform" - "v2ray.com/core/common/signal" -) - -type Option func(*directRay) - -type addInt64 interface { - Add(int64) int64 -} - -func WithUplinkStatCounter(c addInt64) Option { - return func(s *directRay) { - s.Input.onDataSize = append(s.Input.onDataSize, func(delta uint64) { - c.Add(int64(delta)) - }) - } -} - -func WithDownlinkStatCounter(c addInt64) Option { - return func(s *directRay) { - s.Output.onDataSize = append(s.Output.onDataSize, func(delta uint64) { - c.Add(int64(delta)) - }) - } -} - -// New creates a new Ray for direct traffic transport. -func New(ctx context.Context, opts ...Option) Ray { - r := &directRay{ - Input: NewStream(ctx), - Output: NewStream(ctx), - } - for _, opt := range opts { - opt(r) - } - return r -} - -type directRay struct { - Input *Stream - Output *Stream -} - -func (v *directRay) OutboundInput() InputStream { - return v.Input -} - -func (v *directRay) OutboundOutput() OutputStream { - return v.Output -} - -func (v *directRay) InboundInput() OutputStream { - return v.Input -} - -func (v *directRay) InboundOutput() InputStream { - return v.Output -} - -var streamSizeLimit uint64 = 10 * 1024 * 1024 - -func init() { - const raySizeEnvKey = "v2ray.ray.buffer.size" - size := platform.EnvFlag{ - Name: raySizeEnvKey, - AltName: platform.NormalizeEnvName(raySizeEnvKey), - }.GetValueAsInt(10) - streamSizeLimit = uint64(size) * 1024 * 1024 -} - -// Stream is a sequential container for data in bytes. -type Stream struct { - access sync.RWMutex - data buf.MultiBuffer - size uint64 - ctx context.Context - readSignal *signal.Notifier - writeSignal *signal.Notifier - onDataSize []func(uint64) - close bool - err bool -} - -// NewStream creates a new Stream. -func NewStream(ctx context.Context) *Stream { - s := &Stream{ - ctx: ctx, - readSignal: signal.NewNotifier(), - writeSignal: signal.NewNotifier(), - size: 0, - } - return s -} - -func (s *Stream) getData() (buf.MultiBuffer, error) { - s.access.Lock() - defer s.access.Unlock() - - if s.data != nil { - mb := s.data - s.data = nil - s.size = 0 - return mb, nil - } - - if s.err { - return nil, io.ErrClosedPipe - } - - if s.close { - return nil, io.EOF - } - - return nil, nil -} - -// Peek fills in the given buffer with data from head of the Stream. -func (s *Stream) Peek(b *buf.Buffer) { - s.access.RLock() - defer s.access.RUnlock() - - common.Must(b.Reset(func(data []byte) (int, error) { - return s.data.Copy(data), nil - })) -} - -// ReadMultiBuffer reads data from the Stream. -func (s *Stream) ReadMultiBuffer() (buf.MultiBuffer, error) { - for { - mb, err := s.getData() - if err != nil { - return nil, err - } - - if mb != nil { - s.readSignal.Signal() - return mb, nil - } - - select { - case <-s.ctx.Done(): - return nil, s.ctx.Err() - case <-s.writeSignal.Wait(): - } - } -} - -// ReadTimeout reads from the Stream with a specified timeout. -func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) { - for { - mb, err := s.getData() - if err != nil { - return nil, err - } - - if mb != nil { - s.readSignal.Signal() - return mb, nil - } - - select { - case <-s.ctx.Done(): - return nil, s.ctx.Err() - case <-time.After(timeout): - return nil, buf.ErrReadTimeout - case <-s.writeSignal.Wait(): - } - } -} - -// Size returns the number of bytes hold in the Stream. -func (s *Stream) Size() uint64 { - s.access.RLock() - defer s.access.RUnlock() - - return s.size -} - -// waitForStreamSize waits until the Stream has room for more data, or any error happens. -func (s *Stream) waitForStreamSize() error { - if streamSizeLimit == 0 { - return nil - } - - for s.Size() >= streamSizeLimit { - select { - case <-s.ctx.Done(): - return s.ctx.Err() - case <-s.readSignal.Wait(): - if s.err || s.close { - return io.ErrClosedPipe - } - } - } - - return nil -} - -// WriteMultiBuffer writes more data into the Stream. -func (s *Stream) WriteMultiBuffer(data buf.MultiBuffer) error { - if data.IsEmpty() { - return nil - } - - if err := s.waitForStreamSize(); err != nil { - data.Release() - return err - } - - s.access.Lock() - defer s.access.Unlock() - - if s.err || s.close { - data.Release() - return io.ErrClosedPipe - } - - if s.data == nil { - s.data = buf.NewMultiBufferCap(128) - } - - dataSize := uint64(data.Len()) - for _, f := range s.onDataSize { - f(dataSize) - } - - s.data.AppendMulti(data) - s.size += dataSize - s.writeSignal.Signal() - - return nil -} - -// Close closes the stream for writing. Read() still works until EOF. -func (s *Stream) Close() error { - s.access.Lock() - s.close = true - s.readSignal.Signal() - s.writeSignal.Signal() - s.access.Unlock() - return nil -} - -// CloseError closes the Stream with error. Read() will return an error afterwards. -func (s *Stream) CloseError() { - s.access.Lock() - s.err = true - if s.data != nil { - s.data.Release() - s.data = nil - s.size = 0 - } - s.access.Unlock() - - s.readSignal.Signal() - s.writeSignal.Signal() - -} diff --git a/transport/ray/direct_test.go b/transport/ray/direct_test.go deleted file mode 100644 index 64c0ab1d0..000000000 --- a/transport/ray/direct_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package ray_test - -import ( - "context" - "io" - "testing" - - "v2ray.com/core/common/buf" - . "v2ray.com/core/transport/ray" - . "v2ray.com/ext/assert" -) - -func TestStreamIO(t *testing.T) { - assert := With(t) - - stream := NewStream(context.Background()) - b1 := buf.New() - b1.AppendBytes('a') - assert(stream.WriteMultiBuffer(buf.NewMultiBufferValue(b1)), IsNil) - - _, err := stream.ReadMultiBuffer() - assert(err, IsNil) - - stream.Close() - _, err = stream.ReadMultiBuffer() - assert(err, Equals, io.EOF) - - b2 := buf.New() - b2.AppendBytes('b') - err = stream.WriteMultiBuffer(buf.NewMultiBufferValue(b2)) - assert(err, Equals, io.ErrClosedPipe) -} - -func TestStreamClose(t *testing.T) { - assert := With(t) - - stream := NewStream(context.Background()) - b1 := buf.New() - b1.AppendBytes('a') - assert(stream.WriteMultiBuffer(buf.NewMultiBufferValue(b1)), IsNil) - - stream.Close() - - _, err := stream.ReadMultiBuffer() - assert(err, IsNil) - - _, err = stream.ReadMultiBuffer() - assert(err, Equals, io.EOF) -} diff --git a/transport/ray/ray.go b/transport/ray/ray.go deleted file mode 100644 index 0c779784c..000000000 --- a/transport/ray/ray.go +++ /dev/null @@ -1,54 +0,0 @@ -package ray - -import ( - "v2ray.com/core/common" - "v2ray.com/core/common/buf" -) - -// OutboundRay is a transport interface for outbound connections. -type OutboundRay interface { - // OutboundInput provides a stream for the input of the outbound connection. - // The outbound connection shall write all the input until it is closed. - OutboundInput() InputStream - - // OutboundOutput provides a stream to retrieve the response from the - // outbound connection. The outbound connection shall close the channel - // after all responses are received and put into the channel. - OutboundOutput() OutputStream -} - -// InboundRay is a transport interface for inbound connections. -type InboundRay interface { - // InboundInput provides a stream to retrieve the request from client. - // The inbound connection shall close the channel after the entire request - // is received and put into the channel. - InboundInput() OutputStream - - // InboundOutput provides a stream of data for the inbound connection to write - // as response. The inbound connection shall write all the data from the - // channel until it is closed. - InboundOutput() InputStream -} - -// Ray is an internal transport channel between inbound and outbound connection. -type Ray interface { - InboundRay - OutboundRay -} - -type RayStream interface { - common.Closable - CloseError() -} - -type InputStream interface { - buf.Reader - buf.TimeoutReader - RayStream - Peek(*buf.Buffer) -} - -type OutputStream interface { - buf.Writer - RayStream -}