1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-21 17:46:58 -05:00

move link to transport

This commit is contained in:
Darien Raymond 2018-11-03 12:36:29 +01:00
parent 25e7fa3ade
commit 128a90b98b
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
24 changed files with 67 additions and 68 deletions

View File

@ -7,7 +7,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/common/vio" "v2ray.com/core/transport"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -69,7 +69,7 @@ type Outbound struct {
} }
// Dispatch implements outbound.Handler. // Dispatch implements outbound.Handler.
func (co *Outbound) Dispatch(ctx context.Context, link *vio.Link) { func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
co.access.RLock() co.access.RLock()
if co.closed { if co.closed {

View File

@ -20,6 +20,7 @@ import (
"v2ray.com/core/features/policy" "v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/features/stats" "v2ray.com/core/features/stats"
"v2ray.com/core/transport"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -124,17 +125,17 @@ func (*DefaultDispatcher) Start() error {
// Close implements common.Closable. // Close implements common.Closable.
func (*DefaultDispatcher) Close() error { return nil } func (*DefaultDispatcher) Close() error { return nil }
func (d *DefaultDispatcher) getLink(ctx context.Context) (*vio.Link, *vio.Link) { func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
opt := pipe.OptionsFromContext(ctx) opt := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opt...) uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...) downlinkReader, downlinkWriter := pipe.New(opt...)
inboundLink := &vio.Link{ inboundLink := &transport.Link{
Reader: downlinkReader, Reader: downlinkReader,
Writer: uplinkWriter, Writer: uplinkWriter,
} }
outboundLink := &vio.Link{ outboundLink := &transport.Link{
Reader: uplinkReader, Reader: uplinkReader,
Writer: downlinkWriter, Writer: downlinkWriter,
} }
@ -180,7 +181,7 @@ func shouldOverride(result SniffResult, domainOverride []string) bool {
} }
// Dispatch implements routing.Dispatcher. // Dispatch implements routing.Dispatcher.
func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*vio.Link, error) { func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*transport.Link, error) {
if !destination.IsValid() { if !destination.IsValid() {
panic("Dispatcher: Invalid destination.") panic("Dispatcher: Invalid destination.")
} }
@ -245,7 +246,7 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) {
} }
} }
func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *vio.Link, destination net.Destination) { func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination) {
dispatcher := d.ohm.GetDefaultHandler() dispatcher := d.ohm.GetDefaultHandler()
if d.router != nil { if d.router != nil {
if tag, err := d.router.PickRoute(ctx); err == nil { if tag, err := d.router.PickRoute(ctx); err == nil {

View File

@ -9,9 +9,9 @@ import (
"v2ray.com/core/common/mux" "v2ray.com/core/common/mux"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/outbound" "v2ray.com/core/features/outbound"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -96,7 +96,7 @@ func (h *Handler) Tag() string {
} }
// Dispatch implements proxy.Outbound.Dispatch. // Dispatch implements proxy.Outbound.Dispatch.
func (h *Handler) Dispatch(ctx context.Context, link *vio.Link) { func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) {
if h.mux != nil { if h.mux != nil {
if err := h.mux.Dispatch(ctx, link); err != nil { if err := h.mux.Dispatch(ctx, link); err != nil {
newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx)) newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
@ -130,7 +130,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
uplinkReader, uplinkWriter := pipe.New(opts...) uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...) downlinkReader, downlinkWriter := pipe.New(opts...)
go handler.Dispatch(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}) go handler.Dispatch(ctx, &transport.Link{Reader: uplinkReader, Writer: downlinkWriter})
return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil
} }

View File

@ -9,8 +9,8 @@ import (
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -144,7 +144,7 @@ func (w *BridgeWorker) Connections() uint32 {
return w.worker.ActiveConnections() return w.worker.ActiveConnections()
} }
func (w *BridgeWorker) handleInternalConn(link vio.Link) { func (w *BridgeWorker) handleInternalConn(link transport.Link) {
go func() { go func() {
reader := link.Reader reader := link.Reader
for { for {
@ -166,7 +166,7 @@ func (w *BridgeWorker) handleInternalConn(link vio.Link) {
}() }()
} }
func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) { func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
if !isInternalDomain(dest) { if !isInternalDomain(dest) {
ctx = session.ContextWithInbound(ctx, &session.Inbound{ ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: w.tag, Tag: w.tag,
@ -178,12 +178,12 @@ func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*vio
uplinkReader, uplinkWriter := pipe.New(opt...) uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...) downlinkReader, downlinkWriter := pipe.New(opt...)
w.handleInternalConn(vio.Link{ w.handleInternalConn(transport.Link{
Reader: downlinkReader, Reader: downlinkReader,
Writer: uplinkWriter, Writer: uplinkWriter,
}) })
return &vio.Link{ return &transport.Link{
Reader: uplinkReader, Reader: uplinkReader,
Writer: downlinkWriter, Writer: downlinkWriter,
}, nil }, nil

View File

@ -12,8 +12,8 @@ import (
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/outbound" "v2ray.com/core/features/outbound"
"v2ray.com/core/transport"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -61,7 +61,7 @@ func (p *Portal) Close() error {
return p.ohm.RemoveHandler(context.Background(), p.tag) return p.ohm.RemoveHandler(context.Background(), p.tag)
} }
func (s *Portal) HandleConnection(ctx context.Context, link *vio.Link) error { func (s *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
outboundMeta := session.OutboundFromContext(ctx) outboundMeta := session.OutboundFromContext(ctx)
if outboundMeta == nil { if outboundMeta == nil {
return newError("outbound metadata not found").AtError() return newError("outbound metadata not found").AtError()
@ -94,7 +94,7 @@ func (o *Outbound) Tag() string {
return o.tag return o.tag
} }
func (o *Outbound) Dispatch(ctx context.Context, link *vio.Link) { func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
if err := o.portal.HandleConnection(ctx, link); err != nil { if err := o.portal.HandleConnection(ctx, link); err != nil {
newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx)) newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
pipe.CloseError(link.Writer) pipe.CloseError(link.Writer)
@ -206,7 +206,7 @@ func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
ctx = session.ContextWithOutbound(ctx, &session.Outbound{ ctx = session.ContextWithOutbound(ctx, &session.Outbound{
Target: net.UDPDestination(net.DomainAddress(internalDomain), 0), Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
}) })
f := client.Dispatch(ctx, &vio.Link{ f := client.Dispatch(ctx, &transport.Link{
Reader: uplinkReader, Reader: uplinkReader,
Writer: downlinkWriter, Writer: downlinkWriter,
}) })

View File

@ -372,15 +372,12 @@ type RoutingRule struct {
// List of GeoIPs for target IP address matching. If this entry exists, the cidr above will have no effect. // List of GeoIPs for target IP address matching. If this entry exists, the cidr above will have no effect.
// GeoIP fields with the same country code are supposed to contain exactly same content. They will be merged during runtime. // GeoIP fields with the same country code are supposed to contain exactly same content. They will be merged during runtime.
// For customized GeoIPs, please leave country code empty. // For customized GeoIPs, please leave country code empty.
// The CIDR list in the GeoIP must be sorted beforehand.
Geoip []*GeoIP `protobuf:"bytes,10,rep,name=geoip,proto3" json:"geoip,omitempty"` Geoip []*GeoIP `protobuf:"bytes,10,rep,name=geoip,proto3" json:"geoip,omitempty"`
PortRange *net.PortRange `protobuf:"bytes,4,opt,name=port_range,json=portRange,proto3" json:"port_range,omitempty"` PortRange *net.PortRange `protobuf:"bytes,4,opt,name=port_range,json=portRange,proto3" json:"port_range,omitempty"`
NetworkList *net.NetworkList `protobuf:"bytes,5,opt,name=network_list,json=networkList,proto3" json:"network_list,omitempty"` NetworkList *net.NetworkList `protobuf:"bytes,5,opt,name=network_list,json=networkList,proto3" json:"network_list,omitempty"`
// List of CIDRs for source IP address matching. // List of CIDRs for source IP address matching.
// The list must be sorted beforehand.
SourceCidr []*CIDR `protobuf:"bytes,6,rep,name=source_cidr,json=sourceCidr,proto3" json:"source_cidr,omitempty"` // Deprecated: Do not use. SourceCidr []*CIDR `protobuf:"bytes,6,rep,name=source_cidr,json=sourceCidr,proto3" json:"source_cidr,omitempty"` // Deprecated: Do not use.
// List of GeoIPs for source IP address matching. If this entry exists, the source_cidr above will have no effect. // List of GeoIPs for source IP address matching. If this entry exists, the source_cidr above will have no effect.
// The CIDR list in the GeoIP must be sorted beforehand.
SourceGeoip []*GeoIP `protobuf:"bytes,11,rep,name=source_geoip,json=sourceGeoip,proto3" json:"source_geoip,omitempty"` SourceGeoip []*GeoIP `protobuf:"bytes,11,rep,name=source_geoip,json=sourceGeoip,proto3" json:"source_geoip,omitempty"`
UserEmail []string `protobuf:"bytes,7,rep,name=user_email,json=userEmail,proto3" json:"user_email,omitempty"` UserEmail []string `protobuf:"bytes,7,rep,name=user_email,json=userEmail,proto3" json:"user_email,omitempty"`
InboundTag []string `protobuf:"bytes,8,rep,name=inbound_tag,json=inboundTag,proto3" json:"inbound_tag,omitempty"` InboundTag []string `protobuf:"bytes,8,rep,name=inbound_tag,json=inboundTag,proto3" json:"inbound_tag,omitempty"`

View File

@ -14,8 +14,8 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -24,7 +24,7 @@ type ClientManager struct {
Picker WorkerPicker Picker WorkerPicker
} }
func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error { func (m *ClientManager) Dispatch(ctx context.Context, link *transport.Link) error {
for { for {
worker, err := m.Picker.PickAvailable() worker, err := m.Picker.PickAvailable()
if err != nil { if err != nil {
@ -114,7 +114,7 @@ func (p *IncrementalWorkerPicker) pickInternal() (*ClientWorker, error, bool) {
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) { func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
worker, err, start := p.pickInternal() worker, err, start := p.pickInternal()
if start { if start {
p.cleanupTask.Start() common.Must(p.cleanupTask.Start())
} }
return worker, err return worker, err
@ -135,7 +135,7 @@ func (f *DialingWorkerFactory) Create() (*ClientWorker, error) {
uplinkReader, upLinkWriter := pipe.New(opts...) uplinkReader, upLinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...) downlinkReader, downlinkWriter := pipe.New(opts...)
c, err := NewClientWorker(vio.Link{ c, err := NewClientWorker(transport.Link{
Reader: downlinkReader, Reader: downlinkReader,
Writer: upLinkWriter, Writer: upLinkWriter,
}, f.Strategy) }, f.Strategy)
@ -150,7 +150,7 @@ func (f *DialingWorkerFactory) Create() (*ClientWorker, error) {
}) })
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
if err := p.Process(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}, d); err != nil { if err := p.Process(ctx, &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}, d); err != nil {
errors.New("failed to handler mux client connection").Base(err).WriteToLog() errors.New("failed to handler mux client connection").Base(err).WriteToLog()
} }
common.Must(c.Close()) common.Must(c.Close())
@ -167,7 +167,7 @@ type ClientStrategy struct {
type ClientWorker struct { type ClientWorker struct {
sessionManager *SessionManager sessionManager *SessionManager
link vio.Link link transport.Link
done *done.Instance done *done.Instance
strategy ClientStrategy strategy ClientStrategy
} }
@ -176,7 +176,7 @@ var muxCoolAddress = net.DomainAddress("v1.mux.cool")
var muxCoolPort = net.Port(9527) var muxCoolPort = net.Port(9527)
// NewClientWorker creates a new mux.Client. // NewClientWorker creates a new mux.Client.
func NewClientWorker(stream vio.Link, s ClientStrategy) (*ClientWorker, error) { func NewClientWorker(stream transport.Link, s ClientStrategy) (*ClientWorker, error) {
c := &ClientWorker{ c := &ClientWorker{
sessionManager: NewSessionManager(), sessionManager: NewSessionManager(),
link: stream, link: stream,
@ -283,7 +283,7 @@ func (m *ClientWorker) IsFull() bool {
return false return false
} }
func (m *ClientWorker) Dispatch(ctx context.Context, link *vio.Link) bool { func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool {
if m.IsFull() || m.Closed() { if m.IsFull() || m.Closed() {
return false return false
} }

View File

@ -35,7 +35,7 @@ func TestClientWorkerEOF(t *testing.T) {
reader, writer := pipe.New(pipe.WithoutSizeLimit()) reader, writer := pipe.New(pipe.WithoutSizeLimit())
common.Must(writer.Close()) common.Must(writer.Close())
worker, err := mux.NewClientWorker(vio.Link{Reader: reader, Writer: writer}, mux.ClientStrategy{}) worker, err := mux.NewClientWorker(transport.Link{Reader: reader, Writer: writer}, mux.ClientStrategy{})
common.Must(err) common.Must(err)
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)

View File

@ -11,8 +11,8 @@ import (
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -35,7 +35,7 @@ func (s *Server) Type() interface{} {
} }
// Dispatch impliments routing.Dispatcher // Dispatch impliments routing.Dispatcher
func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) { func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
if dest.Address != muxCoolAddress { if dest.Address != muxCoolAddress {
return s.dispatcher.Dispatch(ctx, dest) return s.dispatcher.Dispatch(ctx, dest)
} }
@ -44,7 +44,7 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link,
uplinkReader, uplinkWriter := pipe.New(opts...) uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...) downlinkReader, downlinkWriter := pipe.New(opts...)
_, err := NewServerWorker(ctx, s.dispatcher, &vio.Link{ _, err := NewServerWorker(ctx, s.dispatcher, &transport.Link{
Reader: uplinkReader, Reader: uplinkReader,
Writer: downlinkWriter, Writer: downlinkWriter,
}) })
@ -52,7 +52,7 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link,
return nil, err return nil, err
} }
return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil return &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
} }
// Start implements common.Runnable. // Start implements common.Runnable.
@ -67,11 +67,11 @@ func (s *Server) Close() error {
type ServerWorker struct { type ServerWorker struct {
dispatcher routing.Dispatcher dispatcher routing.Dispatcher
link *vio.Link link *transport.Link
sessionManager *SessionManager sessionManager *SessionManager
} }
func NewServerWorker(ctx context.Context, d routing.Dispatcher, link *vio.Link) (*ServerWorker, error) { func NewServerWorker(ctx context.Context, d routing.Dispatcher, link *transport.Link) (*ServerWorker, error) {
worker := &ServerWorker{ worker := &ServerWorker{
dispatcher: d, dispatcher: d,
link: link, link: link,

View File

@ -2,7 +2,7 @@ package net
import "net" import "net"
// DialTCP is an injectable function. Default to net.DialTCP // DialTCP is an alias of net.DialTCP.
var DialTCP = net.DialTCP var DialTCP = net.DialTCP
var DialUDP = net.DialUDP var DialUDP = net.DialUDP
var DialUnix = net.DialUnix var DialUnix = net.DialUnix
@ -19,6 +19,7 @@ var LookupIP = net.LookupIP
var FileConn = net.FileConn var FileConn = net.FileConn
// ParseIP is an alias of net.ParseIP
var ParseIP = net.ParseIP var ParseIP = net.ParseIP
var SplitHostPort = net.SplitHostPort var SplitHostPort = net.SplitHostPort

View File

@ -4,15 +4,15 @@ import (
"context" "context"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/vio"
"v2ray.com/core/features" "v2ray.com/core/features"
"v2ray.com/core/transport"
) )
// Handler is the interface for handlers that process outbound connections. // Handler is the interface for handlers that process outbound connections.
type Handler interface { type Handler interface {
common.Runnable common.Runnable
Tag() string Tag() string
Dispatch(ctx context.Context, link *vio.Link) Dispatch(ctx context.Context, link *transport.Link)
} }
// Manager is a feature that manages outbound.Handlers. // Manager is a feature that manages outbound.Handlers.

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/vio"
"v2ray.com/core/features" "v2ray.com/core/features"
"v2ray.com/core/transport"
) )
// Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules. // Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules.
@ -14,7 +14,7 @@ type Dispatcher interface {
features.Feature features.Feature
// Dispatch returns a Ray for transporting data for the given request. // Dispatch returns a Ray for transporting data for the given request.
Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error)
} }
// DispatcherType returns the type of Dispatcher interface. Can be used to implement common.HasType. // DispatcherType returns the type of Dispatcher interface. Can be used to implement common.HasType.

View File

@ -8,7 +8,7 @@ import (
"time" "time"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/vio" "v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -30,7 +30,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
} }
// Process implements OutboundHandler.Dispatch(). // Process implements OutboundHandler.Dispatch().
func (h *Handler) Process(ctx context.Context, link *vio.Link, dialer internet.Dialer) error { func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
nBytes := h.response.WriteTo(link.Writer) nBytes := h.response.WriteTo(link.Writer)
if nBytes > 0 { if nBytes > 0 {
// Sleep a little here to make sure the response is sent to client. // Sleep a little here to make sure the response is sent to client.

View File

@ -7,8 +7,8 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
"v2ray.com/core/common/vio"
"v2ray.com/core/proxy/blackhole" "v2ray.com/core/proxy/blackhole"
"v2ray.com/core/transport"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
@ -28,7 +28,7 @@ func TestBlackholeHTTPResponse(t *testing.T) {
rerr = e rerr = e
}() }()
link := vio.Link{ link := transport.Link{
Reader: reader, Reader: reader,
Writer: writer, Writer: writer,
} }

View File

@ -15,9 +15,9 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/dns" "v2ray.com/core/features/dns"
"v2ray.com/core/features/policy" "v2ray.com/core/features/policy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -78,7 +78,7 @@ func isValidAddress(addr *net.IPOrDomain) bool {
} }
// Process implements proxy.Outbound. // Process implements proxy.Outbound.
func (h *Handler) Process(ctx context.Context, link *vio.Link, dialer internet.Dialer) error { func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.") return newError("target not specified.")

View File

@ -9,7 +9,7 @@ import (
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio" "v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -20,7 +20,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
return &Client{}, nil return &Client{}, nil
} }
func (c *Client) Process(ctx context.Context, link *vio.Link, dialer internet.Dialer) error { func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("unknown destination.") return newError("unknown destination.")

View File

@ -10,8 +10,8 @@ import (
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -27,7 +27,7 @@ type Inbound interface {
// An Outbound process outbound connections. // An Outbound process outbound connections.
type Outbound interface { type Outbound interface {
// Process processes the given connection. The given dialer may be used to dial a system outbound connection. // Process processes the given connection. The given dialer may be used to dial a system outbound connection.
Process(context.Context, *vio.Link, internet.Dialer) error Process(context.Context, *transport.Link, internet.Dialer) error
} }
// UserManager is the interface for Inbounds and Outbounds that can manage their users. // UserManager is the interface for Inbounds and Outbounds that can manage their users.

View File

@ -12,8 +12,8 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/policy" "v2ray.com/core/features/policy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -46,7 +46,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
} }
// Process implements OutboundHandler.Process(). // Process implements OutboundHandler.Process().
func (c *Client) Process(ctx context.Context, link *vio.Link, dialer internet.Dialer) error { func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified") return newError("target not specified")

View File

@ -13,8 +13,8 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/policy" "v2ray.com/core/features/policy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -46,7 +46,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
} }
// Process implements proxy.Outbound.Process. // Process implements proxy.Outbound.Process.
func (c *Client) Process(ctx context.Context, link *vio.Link, dialer internet.Dialer) error { func (c *Client) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx) outbound := session.OutboundFromContext(ctx)
if outbound == nil || !outbound.Target.IsValid() { if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.") return newError("target not specified.")

View File

@ -16,10 +16,10 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/policy" "v2ray.com/core/features/policy"
"v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/proxy/vmess/encoding"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
@ -52,7 +52,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
} }
// Process implements proxy.Outbound.Process(). // Process implements proxy.Outbound.Process().
func (v *Handler) Process(ctx context.Context, link *vio.Link, dialer internet.Dialer) error { func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
var rec *protocol.ServerSpec var rec *protocol.ServerSpec
var conn internet.Connection var conn internet.Connection

View File

@ -9,8 +9,8 @@ import (
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
reflect "reflect" reflect "reflect"
net "v2ray.com/core/common/net" net "v2ray.com/core/common/net"
vio "v2ray.com/core/common/vio"
routing "v2ray.com/core/features/routing" routing "v2ray.com/core/features/routing"
transport "v2ray.com/core/transport"
internet "v2ray.com/core/transport/internet" internet "v2ray.com/core/transport/internet"
) )
@ -85,7 +85,7 @@ func (m *ProxyOutbound) EXPECT() *ProxyOutboundMockRecorder {
} }
// Process mocks base method // Process mocks base method
func (m *ProxyOutbound) Process(arg0 context.Context, arg1 *vio.Link, arg2 internet.Dialer) error { func (m *ProxyOutbound) Process(arg0 context.Context, arg1 *transport.Link, arg2 internet.Dialer) error {
ret := m.ctrl.Call(m, "Process", arg0, arg1, arg2) ret := m.ctrl.Call(m, "Process", arg0, arg1, arg2)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0

View File

@ -10,14 +10,14 @@ import (
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport"
) )
type ResponseCallback func(ctx context.Context, payload *buf.Buffer) type ResponseCallback func(ctx context.Context, payload *buf.Buffer)
type connEntry struct { type connEntry struct {
link *vio.Link link *transport.Link
timer signal.ActivityUpdater timer signal.ActivityUpdater
cancel context.CancelFunc cancel context.CancelFunc
} }

View File

@ -8,18 +8,18 @@ import (
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport"
. "v2ray.com/core/transport/internet/udp" . "v2ray.com/core/transport/internet/udp"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
. "v2ray.com/ext/assert" . "v2ray.com/ext/assert"
) )
type TestDispatcher struct { type TestDispatcher struct {
OnDispatch func(ctx context.Context, dest net.Destination) (*vio.Link, error) OnDispatch func(ctx context.Context, dest net.Destination) (*transport.Link, error)
} }
func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) { func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
return d.OnDispatch(ctx, dest) return d.OnDispatch(ctx, dest)
} }
@ -55,9 +55,9 @@ func TestSameDestinationDispatching(t *testing.T) {
var count uint32 var count uint32
td := &TestDispatcher{ td := &TestDispatcher{
OnDispatch: func(ctx context.Context, dest net.Destination) (*vio.Link, error) { OnDispatch: func(ctx context.Context, dest net.Destination) (*transport.Link, error) {
atomic.AddUint32(&count, 1) atomic.AddUint32(&count, 1)
return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil return &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
}, },
} }
dest := net.UDPDestination(net.LocalHostIP, 53) dest := net.UDPDestination(net.LocalHostIP, 53)

View File

@ -1,4 +1,4 @@
package vio package transport
import "v2ray.com/core/common/buf" import "v2ray.com/core/common/buf"