1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-06-17 13:05:24 +00:00

first part to move feature interfaces into dedicated directory

This commit is contained in:
Darien Raymond 2018-10-11 20:43:37 +02:00
parent 88387f2d6e
commit b6dc31d3fe
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
40 changed files with 263 additions and 196 deletions

View File

@ -8,9 +8,11 @@ import (
"sync" "sync"
"google.golang.org/grpc" "google.golang.org/grpc"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/features/outbound"
) )
// Commander is a V2Ray feature that provides gRPC methods to external clients. // Commander is a V2Ray feature that provides gRPC methods to external clients.
@ -19,7 +21,7 @@ type Commander struct {
server *grpc.Server server *grpc.Server
config Config config Config
v *core.Instance v *core.Instance
ohm core.OutboundHandlerManager ohm outbound.HandlerManager
} }
// NewCommander creates a new Commander based on the given config. // NewCommander creates a new Commander based on the given config.

View File

@ -4,10 +4,10 @@ import (
"context" "context"
"sync" "sync"
"v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/common/vio"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -60,7 +60,7 @@ func (l *OutboundListener) Addr() net.Addr {
} }
} }
// Outbound is a core.OutboundHandler that handles gRPC connections. // Outbound is a outbound.Handler that handles gRPC connections.
type Outbound struct { type Outbound struct {
tag string tag string
listener *OutboundListener listener *OutboundListener
@ -68,8 +68,8 @@ type Outbound struct {
closed bool closed bool
} }
// Dispatch implements core.OutboundHandler. // Dispatch implements outbound.Handler.
func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) { func (co *Outbound) Dispatch(ctx context.Context, link *vio.Link) {
co.access.RLock() co.access.RLock()
if co.closed { if co.closed {
@ -86,7 +86,7 @@ func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) {
<-closeSignal.Wait() <-closeSignal.Wait()
} }
// Tag implements core.OutboundHandler. // Tag implements outbound.Handler.
func (co *Outbound) Tag() string { func (co *Outbound) Tag() string {
return co.tag return co.tag
} }

View File

@ -16,6 +16,9 @@ import (
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/stats" "v2ray.com/core/common/stats"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -79,8 +82,8 @@ func (r *cachedReader) CloseError() {
// DefaultDispatcher is a default implementation of Dispatcher. // DefaultDispatcher is a default implementation of Dispatcher.
type DefaultDispatcher struct { type DefaultDispatcher struct {
ohm core.OutboundHandlerManager ohm outbound.HandlerManager
router core.Router router routing.Router
policy core.PolicyManager policy core.PolicyManager
stats core.StatManager stats core.StatManager
} }
@ -95,7 +98,7 @@ func NewDefaultDispatcher(ctx context.Context, config *Config) (*DefaultDispatch
stats: v.Stats(), stats: v.Stats(),
} }
if err := v.RegisterFeature((*core.Dispatcher)(nil), d); err != nil { if err := v.RegisterFeature((*routing.Dispatcher)(nil), d); err != nil {
return nil, newError("unable to register Dispatcher").Base(err) return nil, newError("unable to register Dispatcher").Base(err)
} }
return d, nil return d, nil
@ -109,17 +112,17 @@ func (*DefaultDispatcher) Start() error {
// Close implements common.Closable. // Close implements common.Closable.
func (*DefaultDispatcher) Close() error { return nil } func (*DefaultDispatcher) Close() error { return nil }
func (d *DefaultDispatcher) getLink(ctx context.Context) (*core.Link, *core.Link) { func (d *DefaultDispatcher) getLink(ctx context.Context) (*vio.Link, *vio.Link) {
opt := pipe.OptionsFromContext(ctx) opt := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opt...) uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...) downlinkReader, downlinkWriter := pipe.New(opt...)
inboundLink := &core.Link{ inboundLink := &vio.Link{
Reader: downlinkReader, Reader: downlinkReader,
Writer: uplinkWriter, Writer: uplinkWriter,
} }
outboundLink := &core.Link{ outboundLink := &vio.Link{
Reader: uplinkReader, Reader: uplinkReader,
Writer: downlinkWriter, Writer: downlinkWriter,
} }
@ -159,8 +162,8 @@ func shouldOverride(result SniffResult, domainOverride []string) bool {
return false return false
} }
// Dispatch implements core.Dispatcher. // Dispatch implements routing.Dispatcher.
func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*core.Link, error) { func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*vio.Link, error) {
if !destination.IsValid() { if !destination.IsValid() {
panic("Dispatcher: Invalid destination.") panic("Dispatcher: Invalid destination.")
} }
@ -214,7 +217,7 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) {
cReader.Cache(payload) cReader.Cache(payload)
if !payload.IsEmpty() { if !payload.IsEmpty() {
result, err := sniffer.Sniff(payload.Bytes()) result, err := sniffer.Sniff(payload.Bytes())
if err != core.ErrNoClue { if err != common.ErrNoClue {
return result, err return result, err
} }
} }
@ -225,7 +228,7 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) {
} }
} }
func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *core.Link, destination net.Destination) { func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *vio.Link, destination net.Destination) {
dispatcher := d.ohm.GetDefaultHandler() dispatcher := d.ohm.GetDefaultHandler()
if d.router != nil { if d.router != nil {
if tag, err := d.router.PickRoute(ctx); err == nil { if tag, err := d.router.PickRoute(ctx); err == nil {

View File

@ -1,7 +1,7 @@
package dispatcher package dispatcher
import ( import (
"v2ray.com/core" "v2ray.com/core/common"
"v2ray.com/core/common/protocol/bittorrent" "v2ray.com/core/common/protocol/bittorrent"
"v2ray.com/core/common/protocol/http" "v2ray.com/core/common/protocol/http"
"v2ray.com/core/common/protocol/tls" "v2ray.com/core/common/protocol/tls"
@ -34,7 +34,7 @@ func (s *Sniffer) Sniff(payload []byte) (SniffResult, error) {
var pendingSniffer []protocolSniffer var pendingSniffer []protocolSniffer
for _, s := range s.sniffer { for _, s := range s.sniffer {
result, err := s(payload) result, err := s(payload)
if err == core.ErrNoClue { if err == common.ErrNoClue {
pendingSniffer = append(pendingSniffer, s) pendingSniffer = append(pendingSniffer, s)
continue continue
} }
@ -46,7 +46,7 @@ func (s *Sniffer) Sniff(payload []byte) (SniffResult, error) {
if len(pendingSniffer) > 0 { if len(pendingSniffer) > 0 {
s.sniffer = pendingSniffer s.sniffer = pendingSniffer
return nil, core.ErrNoClue return nil, common.ErrNoClue
} }
return nil, errUnknownContent return nil, errUnknownContent

View File

@ -7,9 +7,9 @@ import (
"time" "time"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/features/routing"
"github.com/miekg/dns" "github.com/miekg/dns"
"v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
@ -48,7 +48,7 @@ type ClassicNameServer struct {
clientIP net.IP clientIP net.IP
} }
func NewClassicNameServer(address net.Destination, dispatcher core.Dispatcher, clientIP net.IP) *ClassicNameServer { func NewClassicNameServer(address net.Destination, dispatcher routing.Dispatcher, clientIP net.IP) *ClassicNameServer {
s := &ClassicNameServer{ s := &ClassicNameServer{
address: address, address: address,
ips: make(map[string][]IPRecord), ips: make(map[string][]IPRecord),

View File

@ -6,6 +6,7 @@ import (
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/features/outbound"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
) )
@ -18,7 +19,7 @@ type InboundOperation interface {
// OutboundOperation is the interface for operations that applies to outbound handlers. // OutboundOperation is the interface for operations that applies to outbound handlers.
type OutboundOperation interface { type OutboundOperation interface {
// ApplyOutbound applies this operation to the given outbound handler. // ApplyOutbound applies this operation to the given outbound handler.
ApplyOutbound(context.Context, core.OutboundHandler) error ApplyOutbound(context.Context, outbound.Handler) error
} }
func getInbound(handler core.InboundHandler) (proxy.Inbound, error) { func getInbound(handler core.InboundHandler) (proxy.Inbound, error) {
@ -62,7 +63,7 @@ func (op *RemoveUserOperation) ApplyInbound(ctx context.Context, handler core.In
type handlerServer struct { type handlerServer struct {
s *core.Instance s *core.Instance
ihm core.InboundHandlerManager ihm core.InboundHandlerManager
ohm core.OutboundHandlerManager ohm outbound.HandlerManager
} }
func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) { func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) {
@ -104,7 +105,7 @@ func (s *handlerServer) AddOutbound(ctx context.Context, request *AddOutboundReq
if err != nil { if err != nil {
return nil, err return nil, err
} }
handler, ok := rawHandler.(core.OutboundHandler) handler, ok := rawHandler.(outbound.Handler)
if !ok { if !ok {
return nil, newError("not an OutboundHandler.") return nil, newError("not an OutboundHandler.")
} }

View File

@ -67,7 +67,7 @@ func (m *Manager) GetHandler(ctx context.Context, tag string) (core.InboundHandl
// RemoveHandler implements core.InboundHandlerManager. // RemoveHandler implements core.InboundHandlerManager.
func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
if len(tag) == 0 { if len(tag) == 0 {
return core.ErrNoClue return common.ErrNoClue
} }
m.access.Lock() m.access.Lock()
@ -81,7 +81,7 @@ func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
return nil return nil
} }
return core.ErrNoClue return common.ErrNoClue
} }
// Start implements common.Runnable. // Start implements common.Runnable.

View File

@ -15,6 +15,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tcp" "v2ray.com/core/transport/internet/tcp"
@ -36,7 +37,7 @@ type tcpWorker struct {
stream *internet.MemoryStreamConfig stream *internet.MemoryStreamConfig
recvOrigDest bool recvOrigDest bool
tag string tag string
dispatcher core.Dispatcher dispatcher routing.Dispatcher
sniffingConfig *proxyman.SniffingConfig sniffingConfig *proxyman.SniffingConfig
uplinkCounter core.StatCounter uplinkCounter core.StatCounter
downlinkCounter core.StatCounter downlinkCounter core.StatCounter
@ -223,7 +224,7 @@ type udpWorker struct {
port net.Port port net.Port
tag string tag string
stream *internet.MemoryStreamConfig stream *internet.MemoryStreamConfig
dispatcher core.Dispatcher dispatcher routing.Dispatcher
uplinkCounter core.StatCounter uplinkCounter core.StatCounter
downlinkCounter core.StatCounter downlinkCounter core.StatCounter

View File

@ -18,6 +18,8 @@ import (
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -42,7 +44,7 @@ func NewClientManager(p proxy.Outbound, d proxy.Dialer, c *proxyman.Multiplexing
} }
} }
func (m *ClientManager) Dispatch(ctx context.Context, link *core.Link) error { func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error {
m.access.Lock() m.access.Lock()
defer m.access.Unlock() defer m.access.Unlock()
@ -77,7 +79,7 @@ func (m *ClientManager) onClientFinish() {
type Client struct { type Client struct {
sessionManager *SessionManager sessionManager *SessionManager
link core.Link link vio.Link
done *done.Instance done *done.Instance
manager *ClientManager manager *ClientManager
concurrency uint32 concurrency uint32
@ -99,7 +101,7 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer proxy.Dialer, m *C
c := &Client{ c := &Client{
sessionManager: NewSessionManager(), sessionManager: NewSessionManager(),
link: core.Link{ link: vio.Link{
Reader: downlinkReader, Reader: downlinkReader,
Writer: upLinkWriter, Writer: upLinkWriter,
}, },
@ -109,7 +111,7 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer proxy.Dialer, m *C
} }
go func() { go func() {
if err := p.Process(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil { if err := p.Process(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil {
errors.New("failed to handler mux client connection").Base(err).WriteToLog() errors.New("failed to handler mux client connection").Base(err).WriteToLog()
} }
common.Must(c.done.Close()) common.Must(c.done.Close())
@ -188,7 +190,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
} }
} }
func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool { func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool {
sm := m.sessionManager sm := m.sessionManager
if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal { if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal {
return false return false
@ -297,7 +299,7 @@ func (m *Client) fetchOutput() {
} }
type Server struct { type Server struct {
dispatcher core.Dispatcher dispatcher routing.Dispatcher
} }
// NewServer creates a new mux.Server. // NewServer creates a new mux.Server.
@ -308,7 +310,7 @@ func NewServer(ctx context.Context) *Server {
return s return s
} }
func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) { func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
if dest.Address != muxCoolAddress { if dest.Address != muxCoolAddress {
return s.dispatcher.Dispatch(ctx, dest) return s.dispatcher.Dispatch(ctx, dest)
} }
@ -319,14 +321,14 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link
worker := &ServerWorker{ worker := &ServerWorker{
dispatcher: s.dispatcher, dispatcher: s.dispatcher,
link: &core.Link{ link: &vio.Link{
Reader: uplinkReader, Reader: uplinkReader,
Writer: downlinkWriter, Writer: downlinkWriter,
}, },
sessionManager: NewSessionManager(), sessionManager: NewSessionManager(),
} }
go worker.run(ctx) go worker.run(ctx)
return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
} }
// Start implements common.Runnable. // Start implements common.Runnable.
@ -340,8 +342,8 @@ func (s *Server) Close() error {
} }
type ServerWorker struct { type ServerWorker struct {
dispatcher core.Dispatcher dispatcher routing.Dispatcher
link *core.Link link *vio.Link
sessionManager *SessionManager sessionManager *SessionManager
} }

View File

@ -9,6 +9,8 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/outbound"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -19,11 +21,11 @@ type Handler struct {
senderSettings *proxyman.SenderConfig senderSettings *proxyman.SenderConfig
streamSettings *internet.MemoryStreamConfig streamSettings *internet.MemoryStreamConfig
proxy proxy.Outbound proxy proxy.Outbound
outboundManager core.OutboundHandlerManager outboundManager outbound.HandlerManager
mux *mux.ClientManager mux *mux.ClientManager
} }
func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (core.OutboundHandler, error) { func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbound.Handler, error) {
v := core.MustFromContext(ctx) v := core.MustFromContext(ctx)
h := &Handler{ h := &Handler{
config: config, config: config,
@ -75,13 +77,13 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (core.O
return h, nil return h, nil
} }
// Tag implements core.OutboundHandler. // Tag implements outbound.Handler.
func (h *Handler) Tag() string { func (h *Handler) Tag() string {
return h.config.Tag return h.config.Tag
} }
// Dispatch implements proxy.Outbound.Dispatch. // Dispatch implements proxy.Outbound.Dispatch.
func (h *Handler) Dispatch(ctx context.Context, link *core.Link) { func (h *Handler) Dispatch(ctx context.Context, link *vio.Link) {
if h.mux != nil { if h.mux != nil {
if err := h.mux.Dispatch(ctx, link); err != nil { if err := h.mux.Dispatch(ctx, link); err != nil {
newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx)) newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
@ -115,7 +117,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
uplinkReader, uplinkWriter := pipe.New(opts...) uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...) downlinkReader, downlinkWriter := pipe.New(opts...)
go handler.Dispatch(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}) go handler.Dispatch(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter})
return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil
} }

View File

@ -3,14 +3,14 @@ package outbound_test
import ( import (
"testing" "testing"
"v2ray.com/core"
. "v2ray.com/core/app/proxyman/outbound" . "v2ray.com/core/app/proxyman/outbound"
"v2ray.com/core/features/outbound"
. "v2ray.com/ext/assert" . "v2ray.com/ext/assert"
) )
func TestInterfaces(t *testing.T) { func TestInterfaces(t *testing.T) {
assert := With(t) assert := With(t)
assert((*Handler)(nil), Implements, (*core.OutboundHandler)(nil)) assert((*Handler)(nil), Implements, (*outbound.Handler)(nil))
assert((*Manager)(nil), Implements, (*core.OutboundHandlerManager)(nil)) assert((*Manager)(nil), Implements, (*outbound.HandlerManager)(nil))
} }

View File

@ -9,24 +9,25 @@ import (
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/app/proxyman" "v2ray.com/core/app/proxyman"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/features/outbound"
) )
// Manager is to manage all outbound handlers. // Manager is to manage all outbound handlers.
type Manager struct { type Manager struct {
access sync.RWMutex access sync.RWMutex
defaultHandler core.OutboundHandler defaultHandler outbound.Handler
taggedHandler map[string]core.OutboundHandler taggedHandler map[string]outbound.Handler
untaggedHandlers []core.OutboundHandler untaggedHandlers []outbound.Handler
running bool running bool
} }
// New creates a new Manager. // New creates a new Manager.
func New(ctx context.Context, config *proxyman.OutboundConfig) (*Manager, error) { func New(ctx context.Context, config *proxyman.OutboundConfig) (*Manager, error) {
m := &Manager{ m := &Manager{
taggedHandler: make(map[string]core.OutboundHandler), taggedHandler: make(map[string]outbound.Handler),
} }
v := core.MustFromContext(ctx) v := core.MustFromContext(ctx)
if err := v.RegisterFeature((*core.OutboundHandlerManager)(nil), m); err != nil { if err := v.RegisterFeature((*outbound.HandlerManager)(nil), m); err != nil {
return nil, newError("unable to register OutboundHandlerManager").Base(err) return nil, newError("unable to register OutboundHandlerManager").Base(err)
} }
return m, nil return m, nil
@ -72,8 +73,8 @@ func (m *Manager) Close() error {
return nil return nil
} }
// GetDefaultHandler implements core.OutboundHandlerManager. // GetDefaultHandler implements outbound.HandlerManager.
func (m *Manager) GetDefaultHandler() core.OutboundHandler { func (m *Manager) GetDefaultHandler() outbound.Handler {
m.access.RLock() m.access.RLock()
defer m.access.RUnlock() defer m.access.RUnlock()
@ -83,8 +84,8 @@ func (m *Manager) GetDefaultHandler() core.OutboundHandler {
return m.defaultHandler return m.defaultHandler
} }
// GetHandler implements core.OutboundHandlerManager. // GetHandler implements outbound.HandlerManager.
func (m *Manager) GetHandler(tag string) core.OutboundHandler { func (m *Manager) GetHandler(tag string) outbound.Handler {
m.access.RLock() m.access.RLock()
defer m.access.RUnlock() defer m.access.RUnlock()
if handler, found := m.taggedHandler[tag]; found { if handler, found := m.taggedHandler[tag]; found {
@ -93,8 +94,8 @@ func (m *Manager) GetHandler(tag string) core.OutboundHandler {
return nil return nil
} }
// AddHandler implements core.OutboundHandlerManager. // AddHandler implements outbound.HandlerManager.
func (m *Manager) AddHandler(ctx context.Context, handler core.OutboundHandler) error { func (m *Manager) AddHandler(ctx context.Context, handler outbound.Handler) error {
m.access.Lock() m.access.Lock()
defer m.access.Unlock() defer m.access.Unlock()
@ -116,10 +117,10 @@ func (m *Manager) AddHandler(ctx context.Context, handler core.OutboundHandler)
return nil return nil
} }
// RemoveHandler implements core.OutboundHandlerManager. // RemoveHandler implements outbound.HandlerManager.
func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
if len(tag) == 0 { if len(tag) == 0 {
return core.ErrNoClue return common.ErrNoClue
} }
m.access.Lock() m.access.Lock()
defer m.access.Unlock() defer m.access.Unlock()

View File

@ -6,6 +6,7 @@ import (
"context" "context"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/features/routing"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
@ -13,7 +14,7 @@ import (
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
) )
// Router is an implementation of core.Router. // Router is an implementation of routing.Router.
type Router struct { type Router struct {
domainStrategy Config_DomainStrategy domainStrategy Config_DomainStrategy
rules []Rule rules []Rule
@ -38,7 +39,7 @@ func NewRouter(ctx context.Context, config *Config) (*Router, error) {
r.rules[idx].Condition = cond r.rules[idx].Condition = cond
} }
if err := v.RegisterFeature((*core.Router)(nil), r); err != nil { if err := v.RegisterFeature((*routing.Router)(nil), r); err != nil {
return nil, newError("unable to register Router").Base(err) return nil, newError("unable to register Router").Base(err)
} }
return r, nil return r, nil
@ -72,7 +73,7 @@ func (r *ipResolver) Resolve() []net.Address {
return r.ip return r.ip
} }
// PickRoute implements core.Router. // PickRoute implements routing.Router.
func (r *Router) PickRoute(ctx context.Context) (string, error) { func (r *Router) PickRoute(ctx context.Context) (string, error) {
resolver := &ipResolver{ resolver := &ipResolver{
dns: r.dns, dns: r.dns,
@ -93,7 +94,7 @@ func (r *Router) PickRoute(ctx context.Context) (string, error) {
} }
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return "", core.ErrNoClue return "", common.ErrNoClue
} }
dest := outbound.Target dest := outbound.Target
@ -110,7 +111,7 @@ func (r *Router) PickRoute(ctx context.Context) (string, error) {
} }
} }
return "", core.ErrNoClue return "", common.ErrNoClue
} }
// Start implements common.Runnable. // Start implements common.Runnable.

View File

@ -2,8 +2,15 @@
// See each sub-package for detail. // See each sub-package for detail.
package common package common
import "v2ray.com/core/common/errors"
//go:generate errorgen //go:generate errorgen
var (
// ErrNoClue is for the situation that existing information is not enough to make a decision. For example, Router may return this error when there is no suitable route.
ErrNoClue = errors.New("not enough information for making a decision")
)
// Must panics if err is not nil. // Must panics if err is not nil.
func Must(err error) { func Must(err error) {
if err != nil { if err != nil {

View File

@ -3,7 +3,7 @@ package bittorrent
import ( import (
"errors" "errors"
"v2ray.com/core" "v2ray.com/core/common"
) )
type SniffHeader struct { type SniffHeader struct {
@ -21,7 +21,7 @@ var errNotBittorrent = errors.New("not bittorrent header")
func SniffBittorrent(b []byte) (*SniffHeader, error) { func SniffBittorrent(b []byte) (*SniffHeader, error) {
if len(b) < 20 { if len(b) < 20 {
return nil, core.ErrNoClue return nil, common.ErrNoClue
} }
if b[0] == 19 && string(b[1:20]) == "BitTorrent protocol" { if b[0] == 19 && string(b[1:20]) == "BitTorrent protocol" {

View File

@ -5,7 +5,7 @@ import (
"errors" "errors"
"strings" "strings"
"v2ray.com/core" "v2ray.com/core/common"
) )
type version byte type version byte
@ -48,7 +48,7 @@ func beginWithHTTPMethod(b []byte) error {
} }
if len(b) < len(m) { if len(b) < len(m) {
return core.ErrNoClue return common.ErrNoClue
} }
} }
@ -86,5 +86,5 @@ func SniffHTTP(b []byte) (*SniffHeader, error) {
return sh, nil return sh, nil
} }
return nil, core.ErrNoClue return nil, common.ErrNoClue
} }

View File

@ -4,7 +4,7 @@ import (
"errors" "errors"
"strings" "strings"
"v2ray.com/core" "v2ray.com/core/common"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
) )
@ -31,15 +31,15 @@ func IsValidTLSVersion(major, minor byte) bool {
// https://github.com/golang/go/blob/master/src/crypto/tls/handshake_messages.go#L300 // https://github.com/golang/go/blob/master/src/crypto/tls/handshake_messages.go#L300
func ReadClientHello(data []byte, h *SniffHeader) error { func ReadClientHello(data []byte, h *SniffHeader) error {
if len(data) < 42 { if len(data) < 42 {
return core.ErrNoClue return common.ErrNoClue
} }
sessionIDLen := int(data[38]) sessionIDLen := int(data[38])
if sessionIDLen > 32 || len(data) < 39+sessionIDLen { if sessionIDLen > 32 || len(data) < 39+sessionIDLen {
return core.ErrNoClue return common.ErrNoClue
} }
data = data[39+sessionIDLen:] data = data[39+sessionIDLen:]
if len(data) < 2 { if len(data) < 2 {
return core.ErrNoClue return common.ErrNoClue
} }
// cipherSuiteLen is the number of bytes of cipher suite numbers. Since // cipherSuiteLen is the number of bytes of cipher suite numbers. Since
// they are uint16s, the number must be even. // they are uint16s, the number must be even.
@ -49,11 +49,11 @@ func ReadClientHello(data []byte, h *SniffHeader) error {
} }
data = data[2+cipherSuiteLen:] data = data[2+cipherSuiteLen:]
if len(data) < 1 { if len(data) < 1 {
return core.ErrNoClue return common.ErrNoClue
} }
compressionMethodsLen := int(data[0]) compressionMethodsLen := int(data[0])
if len(data) < 1+compressionMethodsLen { if len(data) < 1+compressionMethodsLen {
return core.ErrNoClue return common.ErrNoClue
} }
data = data[1+compressionMethodsLen:] data = data[1+compressionMethodsLen:]
@ -124,7 +124,7 @@ func ReadClientHello(data []byte, h *SniffHeader) error {
func SniffTLS(b []byte) (*SniffHeader, error) { func SniffTLS(b []byte) (*SniffHeader, error) {
if len(b) < 5 { if len(b) < 5 {
return nil, core.ErrNoClue return nil, common.ErrNoClue
} }
if b[0] != 0x16 /* TLS Handshake */ { if b[0] != 0x16 /* TLS Handshake */ {
@ -135,7 +135,7 @@ func SniffTLS(b []byte) (*SniffHeader, error) {
} }
headerLen := int(serial.BytesToUint16(b[3:5])) headerLen := int(serial.BytesToUint16(b[3:5]))
if 5+headerLen > len(b) { if 5+headerLen > len(b) {
return nil, core.ErrNoClue return nil, common.ErrNoClue
} }
h := &SniffHeader{} h := &SniffHeader{}

9
common/vio/link.go Normal file
View File

@ -0,0 +1,9 @@
package vio
import "v2ray.com/core/common/buf"
// Link is a utility for connecting between an inbound and an outbound proxy handler.
type Link struct {
Reader buf.Reader
Writer buf.Writer
}

9
features/feature.go Normal file
View File

@ -0,0 +1,9 @@
package features
import "v2ray.com/core/common"
// Feature is the interface for V2Ray features. All features must implement this interface.
// All existing features have an implementation in app directory. These features can be replaced by third-party ones.
type Feature interface {
common.Runnable
}

View File

@ -0,0 +1,30 @@
package outbound
import (
"context"
"v2ray.com/core/common"
"v2ray.com/core/common/vio"
"v2ray.com/core/features"
)
// Handler is the interface for handlers that process outbound connections.
type Handler interface {
common.Runnable
Tag() string
Dispatch(ctx context.Context, link *vio.Link)
}
// HandlerManager is a feature that manages outbound.Handlers.
type HandlerManager interface {
features.Feature
// GetHandler returns an outbound.Handler for the given tag.
GetHandler(tag string) Handler
// GetDefaultHandler returns the default outbound.Handler. It is usually the first outbound.Handler specified in the configuration.
GetDefaultHandler() Handler
// AddHandler adds a handler into this outbound.HandlerManager.
AddHandler(ctx context.Context, handler Handler) error
// RemoveHandler removes a handler from outbound.HandlerManager.
RemoveHandler(ctx context.Context, tag string) error
}

View File

@ -0,0 +1,18 @@
package routing
import (
"context"
"v2ray.com/core/common/net"
"v2ray.com/core/common/vio"
"v2ray.com/core/features"
)
// Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules.
// Dispatcher is required to be registered in a V2Ray instance to make V2Ray function properly.
type Dispatcher interface {
features.Feature
// Dispatch returns a Ray for transporting data for the given request.
Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error)
}

View File

@ -0,0 +1,15 @@
package routing
import (
"context"
"v2ray.com/core/features"
)
// Router is a feature to choose an outbound tag for the given request.
type Router interface {
features.Feature
// PickRoute returns a tag of an OutboundHandler based on the given context.
PickRoute(ctx context.Context) (string, error)
}

View File

@ -6,6 +6,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/features/outbound"
) )
// InboundHandler is the interface for handlers that process inbound connections. // InboundHandler is the interface for handlers that process inbound connections.
@ -18,13 +19,6 @@ type InboundHandler interface {
GetRandomInboundProxy() (interface{}, net.Port, int) GetRandomInboundProxy() (interface{}, net.Port, int)
} }
// OutboundHandler is the interface for handlers that process outbound connections.
type OutboundHandler interface {
common.Runnable
Tag() string
Dispatch(ctx context.Context, link *Link)
}
// InboundHandlerManager is a feature that manages InboundHandlers. // InboundHandlerManager is a feature that manages InboundHandlers.
type InboundHandlerManager interface { type InboundHandlerManager interface {
Feature Feature
@ -94,77 +88,63 @@ func (m *syncInboundHandlerManager) Set(manager InboundHandlerManager) {
m.InboundHandlerManager = manager m.InboundHandlerManager = manager
} }
// OutboundHandlerManager is a feature that manages OutboundHandlers.
type OutboundHandlerManager interface {
Feature
// GetHandler returns an OutboundHandler for the given tag.
GetHandler(tag string) OutboundHandler
// GetDefaultHandler returns the default OutboundHandler. It is usually the first OutboundHandler specified in the configuration.
GetDefaultHandler() OutboundHandler
// AddHandler adds a handler into this OutboundHandlerManager.
AddHandler(ctx context.Context, handler OutboundHandler) error
// RemoveHandler removes a handler from OutboundHandlerManager.
RemoveHandler(ctx context.Context, tag string) error
}
type syncOutboundHandlerManager struct { type syncOutboundHandlerManager struct {
sync.RWMutex sync.RWMutex
OutboundHandlerManager outbound.HandlerManager
} }
func (m *syncOutboundHandlerManager) GetHandler(tag string) OutboundHandler { func (m *syncOutboundHandlerManager) GetHandler(tag string) outbound.Handler {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
if m.OutboundHandlerManager == nil { if m.HandlerManager == nil {
return nil return nil
} }
return m.OutboundHandlerManager.GetHandler(tag) return m.HandlerManager.GetHandler(tag)
} }
func (m *syncOutboundHandlerManager) GetDefaultHandler() OutboundHandler { func (m *syncOutboundHandlerManager) GetDefaultHandler() outbound.Handler {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
if m.OutboundHandlerManager == nil { if m.HandlerManager == nil {
return nil return nil
} }
return m.OutboundHandlerManager.GetDefaultHandler() return m.HandlerManager.GetDefaultHandler()
} }
func (m *syncOutboundHandlerManager) AddHandler(ctx context.Context, handler OutboundHandler) error { func (m *syncOutboundHandlerManager) AddHandler(ctx context.Context, handler outbound.Handler) error {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
if m.OutboundHandlerManager == nil { if m.HandlerManager == nil {
return newError("OutboundHandlerManager not set.").AtError() return newError("OutboundHandlerManager not set.").AtError()
} }
return m.OutboundHandlerManager.AddHandler(ctx, handler) return m.HandlerManager.AddHandler(ctx, handler)
} }
func (m *syncOutboundHandlerManager) Start() error { func (m *syncOutboundHandlerManager) Start() error {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
if m.OutboundHandlerManager == nil { if m.HandlerManager == nil {
return newError("OutboundHandlerManager not set.").AtError() return newError("OutboundHandlerManager not set.").AtError()
} }
return m.OutboundHandlerManager.Start() return m.HandlerManager.Start()
} }
func (m *syncOutboundHandlerManager) Close() error { func (m *syncOutboundHandlerManager) Close() error {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
return common.Close(m.OutboundHandlerManager) return common.Close(m.HandlerManager)
} }
func (m *syncOutboundHandlerManager) Set(manager OutboundHandlerManager) { func (m *syncOutboundHandlerManager) Set(manager outbound.HandlerManager) {
if manager == nil { if manager == nil {
return return
} }
@ -172,6 +152,6 @@ func (m *syncOutboundHandlerManager) Set(manager OutboundHandlerManager) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
common.Close(m.OutboundHandlerManager) // nolint: errcheck common.Close(m.HandlerManager) // nolint: errcheck
m.OutboundHandlerManager = manager m.HandlerManager = manager
} }

View File

@ -7,8 +7,8 @@ import (
"context" "context"
"time" "time"
"v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/vio"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -30,7 +30,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
} }
// Process implements OutboundHandler.Dispatch(). // Process implements OutboundHandler.Dispatch().
func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { func (h *Handler) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error {
nBytes := h.response.WriteTo(link.Writer) nBytes := h.response.WriteTo(link.Writer)
if nBytes > 0 { if nBytes > 0 {
// Sleep a little here to make sure the response is sent to client. // Sleep a little here to make sure the response is sent to client.

View File

@ -13,6 +13,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -56,7 +57,7 @@ type hasHandshakeAddress interface {
HandshakeAddress() net.Address HandshakeAddress() net.Address
} }
func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WriteToLog(session.ExportIDToError(ctx)) newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WriteToLog(session.ExportIDToError(ctx))
dest := net.Destination{ dest := net.Destination{
Network: network, Network: network,

View File

@ -15,6 +15,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -75,7 +76,7 @@ func isValidAddress(addr *net.IPOrDomain) bool {
} }
// Process implements proxy.Outbound. // Process implements proxy.Outbound.
func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { func (h *Handler) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.") return newError("target not specified.")

View File

@ -20,6 +20,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -101,7 +102,7 @@ type readerOnly struct {
io.Reader io.Reader
} }
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
reader := bufio.NewReaderSize(readerOnly{conn}, buf.Size) reader := bufio.NewReaderSize(readerOnly{conn}, buf.Size)
Start: Start:
@ -165,7 +166,7 @@ Start:
return err return err
} }
func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader *bufio.Reader, conn internet.Connection, dest net.Destination, dispatcher core.Dispatcher) error { func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader *bufio.Reader, conn internet.Connection, dest net.Destination, dispatcher routing.Dispatcher) error {
_, err := conn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) _, err := conn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n"))
if err != nil { if err != nil {
return newError("failed to write back OK response").Base(err) return newError("failed to write back OK response").Base(err)
@ -222,7 +223,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
var errWaitAnother = newError("keep alive") var errWaitAnother = newError("keep alive")
func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, writer io.Writer, dest net.Destination, dispatcher core.Dispatcher) error { func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, writer io.Writer, dest net.Destination, dispatcher routing.Dispatcher) error {
if !s.config.AllowTransparent && len(request.URL.Host) <= 0 { if !s.config.AllowTransparent && len(request.URL.Host) <= 0 {
// RFC 2068 (HTTP/1.1) requires URL to be absolute URL in HTTP proxy. // RFC 2068 (HTTP/1.1) requires URL to be absolute URL in HTTP proxy.
response := &http.Response{ response := &http.Response{

View File

@ -3,13 +3,13 @@ package mtproto
import ( import (
"context" "context"
"v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/crypto" "v2ray.com/core/common/crypto"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
) )
@ -20,7 +20,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
return &Client{}, nil return &Client{}, nil
} }
func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { func (c *Client) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("unknown destination.") return newError("unknown destination.")

View File

@ -14,6 +14,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -74,7 +75,7 @@ func isValidConnectionType(c [4]byte) bool {
return false return false
} }
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
sPolicy := s.policy.ForLevel(s.user.Level) sPolicy := s.policy.ForLevel(s.user.Level)
if err := conn.SetDeadline(time.Now().Add(sPolicy.Timeouts.Handshake)); err != nil { if err := conn.SetDeadline(time.Now().Add(sPolicy.Timeouts.Handshake)); err != nil {

View File

@ -8,9 +8,10 @@ package proxy
import ( import (
"context" "context"
"v2ray.com/core"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -20,13 +21,13 @@ type Inbound interface {
Network() net.NetworkList Network() net.NetworkList
// Process processes a connection of given network. If necessary, the Inbound can dispatch the connection to an Outbound. // Process processes a connection of given network. If necessary, the Inbound can dispatch the connection to an Outbound.
Process(context.Context, net.Network, internet.Connection, core.Dispatcher) error Process(context.Context, net.Network, internet.Connection, routing.Dispatcher) error
} }
// An Outbound process outbound connections. // An Outbound process outbound connections.
type Outbound interface { type Outbound interface {
// Process processes the given connection. The given dialer may be used to dial a system outbound connection. // Process processes the given connection. The given dialer may be used to dial a system outbound connection.
Process(context.Context, *core.Link, Dialer) error Process(context.Context, *vio.Link, Dialer) error
} }
// Dialer is used by OutboundHandler for creating outbound connections. // Dialer is used by OutboundHandler for creating outbound connections.

View File

@ -5,6 +5,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
@ -44,7 +45,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
} }
// Process implements OutboundHandler.Process(). // Process implements OutboundHandler.Process().
func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { func (c *Client) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified") return newError("target not specified")

View File

@ -13,6 +13,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/internet/udp"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -57,7 +58,7 @@ func (s *Server) Network() net.NetworkList {
return list return list
} }
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
switch network { switch network {
case net.Network_TCP: case net.Network_TCP:
return s.handleConnection(ctx, conn, dispatcher) return s.handleConnection(ctx, conn, dispatcher)
@ -68,7 +69,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
} }
} }
func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) { udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) {
request := protocol.RequestHeaderFromContext(ctx) request := protocol.RequestHeaderFromContext(ctx)
if request == nil { if request == nil {
@ -143,7 +144,7 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
return nil return nil
} }
func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
sessionPolicy := s.v.PolicyManager().ForLevel(s.user.Level) sessionPolicy := s.v.PolicyManager().ForLevel(s.user.Level)
conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)) conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake))

View File

@ -6,6 +6,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
@ -46,7 +47,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
} }
// Process implements proxy.Outbound.Process. // Process implements proxy.Outbound.Process.
func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { func (c *Client) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.") return newError("target not specified.")

View File

@ -14,6 +14,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/internet/udp"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -58,7 +59,7 @@ func (s *Server) Network() net.NetworkList {
} }
// Process implements proxy.Inbound. // Process implements proxy.Inbound.
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
switch network { switch network {
case net.Network_TCP: case net.Network_TCP:
return s.processTCP(ctx, conn, dispatcher) return s.processTCP(ctx, conn, dispatcher)
@ -69,7 +70,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
} }
} }
func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
plcy := s.policy() plcy := s.policy()
if err := conn.SetReadDeadline(time.Now().Add(plcy.Timeouts.Handshake)); err != nil { if err := conn.SetReadDeadline(time.Now().Add(plcy.Timeouts.Handshake)); err != nil {
newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx))
@ -131,7 +132,7 @@ func (*Server) handleUDP(c io.Reader) error {
return common.Error2(io.Copy(buf.DiscardBytes, c)) return common.Error2(io.Copy(buf.DiscardBytes, c))
} }
func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher core.Dispatcher) error { func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher routing.Dispatcher) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle)
@ -172,7 +173,7 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
return nil return nil
} }
func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) { udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) {
newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx)) newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx))

View File

@ -20,6 +20,7 @@ import (
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/uuid" "v2ray.com/core/common/uuid"
"v2ray.com/core/features/routing"
"v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/proxy/vmess/encoding"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
@ -213,7 +214,7 @@ func isInsecureEncryption(s protocol.SecurityType) bool {
} }
// Process implements proxy.Inbound.Process(). // Process implements proxy.Inbound.Process().
func (h *Handler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher core.Dispatcher) error { func (h *Handler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher routing.Dispatcher) error {
sessionPolicy := h.policyManager.ForLevel(0) sessionPolicy := h.policyManager.ForLevel(0)
if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil { if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil {
return newError("unable to set read deadline").Base(err).AtWarning() return newError("unable to set read deadline").Base(err).AtWarning()

View File

@ -16,6 +16,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/proxy/vmess/encoding"
@ -49,7 +50,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
} }
// Process implements proxy.Outbound.Process(). // Process implements proxy.Outbound.Process().
func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { func (v *Handler) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error {
var rec *protocol.ServerSpec var rec *protocol.ServerSpec
var conn internet.Connection var conn internet.Connection

View File

@ -5,32 +5,17 @@ import (
"sync" "sync"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing"
) )
// Link is a utility for connecting between an inbound and an outbound proxy handler.
type Link struct {
Reader buf.Reader
Writer buf.Writer
}
// Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules.
// Dispatcher is required to be registered in a V2Ray instance to make V2Ray function properly.
type Dispatcher interface {
Feature
// Dispatch returns a Ray for transporting data for the given request.
Dispatch(ctx context.Context, dest net.Destination) (*Link, error)
}
type syncDispatcher struct { type syncDispatcher struct {
sync.RWMutex sync.RWMutex
Dispatcher routing.Dispatcher
} }
func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*Link, error) { func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
@ -59,7 +44,7 @@ func (d *syncDispatcher) Close() error {
return common.Close(d.Dispatcher) return common.Close(d.Dispatcher)
} }
func (d *syncDispatcher) Set(disp Dispatcher) { func (d *syncDispatcher) Set(disp routing.Dispatcher) {
if disp == nil { if disp == nil {
return return
} }
@ -71,22 +56,9 @@ func (d *syncDispatcher) Set(disp Dispatcher) {
d.Dispatcher = disp d.Dispatcher = disp
} }
var (
// ErrNoClue is for the situation that existing information is not enough to make a decision. For example, Router may return this error when there is no suitable route.
ErrNoClue = errors.New("not enough information for making a decision")
)
// Router is a feature to choose an outbound tag for the given request.
type Router interface {
Feature
// PickRoute returns a tag of an OutboundHandler based on the given context.
PickRoute(ctx context.Context) (string, error)
}
type syncRouter struct { type syncRouter struct {
sync.RWMutex sync.RWMutex
Router routing.Router
} }
func (r *syncRouter) PickRoute(ctx context.Context) (string, error) { func (r *syncRouter) PickRoute(ctx context.Context) (string, error) {
@ -94,7 +66,7 @@ func (r *syncRouter) PickRoute(ctx context.Context) (string, error) {
defer r.RUnlock() defer r.RUnlock()
if r.Router == nil { if r.Router == nil {
return "", ErrNoClue return "", common.ErrNoClue
} }
return r.Router.PickRoute(ctx) return r.Router.PickRoute(ctx)
@ -118,7 +90,7 @@ func (r *syncRouter) Close() error {
return common.Close(r.Router) return common.Close(r.Router)
} }
func (r *syncRouter) Set(router Router) { func (r *syncRouter) Set(router routing.Router) {
if router == nil { if router == nil {
return return
} }

View File

@ -5,18 +5,19 @@ import (
"sync" "sync"
"time" "time"
"v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing"
) )
type ResponseCallback func(ctx context.Context, payload *buf.Buffer) type ResponseCallback func(ctx context.Context, payload *buf.Buffer)
type connEntry struct { type connEntry struct {
link *core.Link link *vio.Link
timer signal.ActivityUpdater timer signal.ActivityUpdater
cancel context.CancelFunc cancel context.CancelFunc
} }
@ -24,11 +25,11 @@ type connEntry struct {
type Dispatcher struct { type Dispatcher struct {
sync.RWMutex sync.RWMutex
conns map[net.Destination]*connEntry conns map[net.Destination]*connEntry
dispatcher core.Dispatcher dispatcher routing.Dispatcher
callback ResponseCallback callback ResponseCallback
} }
func NewDispatcher(dispatcher core.Dispatcher, callback ResponseCallback) *Dispatcher { func NewDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Dispatcher {
return &Dispatcher{ return &Dispatcher{
conns: make(map[net.Destination]*connEntry), conns: make(map[net.Destination]*connEntry),
dispatcher: dispatcher, dispatcher: dispatcher,

View File

@ -6,19 +6,19 @@ import (
"testing" "testing"
"time" "time"
"v2ray.com/core"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/vio"
. "v2ray.com/core/transport/internet/udp" . "v2ray.com/core/transport/internet/udp"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
. "v2ray.com/ext/assert" . "v2ray.com/ext/assert"
) )
type TestDispatcher struct { type TestDispatcher struct {
OnDispatch func(ctx context.Context, dest net.Destination) (*core.Link, error) OnDispatch func(ctx context.Context, dest net.Destination) (*vio.Link, error)
} }
func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) { func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
return d.OnDispatch(ctx, dest) return d.OnDispatch(ctx, dest)
} }
@ -50,9 +50,9 @@ func TestSameDestinationDispatching(t *testing.T) {
var count uint32 var count uint32
td := &TestDispatcher{ td := &TestDispatcher{
OnDispatch: func(ctx context.Context, dest net.Destination) (*core.Link, error) { OnDispatch: func(ctx context.Context, dest net.Destination) (*vio.Link, error) {
atomic.AddUint32(&count, 1) atomic.AddUint32(&count, 1)
return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
}, },
} }
dest := net.UDPDestination(net.LocalHostIP, 53) dest := net.UDPDestination(net.LocalHostIP, 53)

View File

@ -7,6 +7,8 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
"v2ray.com/core/common/uuid" "v2ray.com/core/common/uuid"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/routing"
) )
// Server is an instance of V2Ray. At any time, there must be at most one Server instance running. // Server is an instance of V2Ray. At any time, there must be at most one Server instance running.
@ -76,12 +78,12 @@ func New(config *Config) (*Instance, error) {
} }
} }
for _, outbound := range config.Outbound { for _, outboundConfig := range config.Outbound {
rawHandler, err := CreateObject(server, outbound) rawHandler, err := CreateObject(server, outboundConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
handler, ok := rawHandler.(OutboundHandler) handler, ok := rawHandler.(outbound.Handler)
if !ok { if !ok {
return nil, newError("not an OutboundHandler") return nil, newError("not an OutboundHandler")
} }
@ -147,14 +149,14 @@ func (s *Instance) RegisterFeature(feature interface{}, instance Feature) error
s.dnsClient.Set(instance.(DNSClient)) s.dnsClient.Set(instance.(DNSClient))
case PolicyManager, *PolicyManager: case PolicyManager, *PolicyManager:
s.policyManager.Set(instance.(PolicyManager)) s.policyManager.Set(instance.(PolicyManager))
case Router, *Router: case routing.Router, *routing.Router:
s.router.Set(instance.(Router)) s.router.Set(instance.(routing.Router))
case Dispatcher, *Dispatcher: case routing.Dispatcher, *routing.Dispatcher:
s.dispatcher.Set(instance.(Dispatcher)) s.dispatcher.Set(instance.(routing.Dispatcher))
case InboundHandlerManager, *InboundHandlerManager: case InboundHandlerManager, *InboundHandlerManager:
s.ihm.Set(instance.(InboundHandlerManager)) s.ihm.Set(instance.(InboundHandlerManager))
case OutboundHandlerManager, *OutboundHandlerManager: case outbound.HandlerManager, *outbound.HandlerManager:
s.ohm.Set(instance.(OutboundHandlerManager)) s.ohm.Set(instance.(outbound.HandlerManager))
case StatManager, *StatManager: case StatManager, *StatManager:
s.stats.Set(instance.(StatManager)) s.stats.Set(instance.(StatManager))
default: default:
@ -198,12 +200,12 @@ func (s *Instance) PolicyManager() PolicyManager {
} }
// Router returns the Router used by this Instance. The returned Router is always functional. // Router returns the Router used by this Instance. The returned Router is always functional.
func (s *Instance) Router() Router { func (s *Instance) Router() routing.Router {
return &(s.router) return &(s.router)
} }
// Dispatcher returns the Dispatcher used by this Instance. If Dispatcher was not registered before, the returned value doesn't work, although it is not nil. // Dispatcher returns the Dispatcher used by this Instance. If Dispatcher was not registered before, the returned value doesn't work, although it is not nil.
func (s *Instance) Dispatcher() Dispatcher { func (s *Instance) Dispatcher() routing.Dispatcher {
return &(s.dispatcher) return &(s.dispatcher)
} }
@ -213,7 +215,7 @@ func (s *Instance) InboundHandlerManager() InboundHandlerManager {
} }
// OutboundHandlerManager returns the OutboundHandlerManager used by this Instance. If OutboundHandlerManager was not registered before, the returned value doesn't work. // OutboundHandlerManager returns the OutboundHandlerManager used by this Instance. If OutboundHandlerManager was not registered before, the returned value doesn't work.
func (s *Instance) OutboundHandlerManager() OutboundHandlerManager { func (s *Instance) OutboundHandlerManager() outbound.HandlerManager {
return &(s.ohm) return &(s.ohm)
} }