1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-02-20 23:47:21 -05:00

option to allow passive connection

This commit is contained in:
v2ray 2016-08-12 23:37:21 +02:00
parent a43ee2f1c2
commit bcd27ba36f
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
18 changed files with 76 additions and 47 deletions

View File

@ -3,6 +3,7 @@ package dispatcher
import ( import (
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/transport/ray" "github.com/v2ray/v2ray-core/transport/ray"
) )
@ -12,5 +13,5 @@ const (
// PacketDispatcher dispatch a packet and possibly further network payload to its destination. // PacketDispatcher dispatch a packet and possibly further network payload to its destination.
type PacketDispatcher interface { type PacketDispatcher interface {
DispatchToOutbound(destination v2net.Destination) ray.InboundRay DispatchToOutbound(meta *proxy.InboundHandlerMeta, destination v2net.Destination) ray.InboundRay
} }

View File

@ -4,6 +4,7 @@ import (
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/proxyman" "github.com/v2ray/v2ray-core/app/proxyman"
"github.com/v2ray/v2ray-core/app/router" "github.com/v2ray/v2ray-core/app/router"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy"
@ -42,7 +43,7 @@ func (this *DefaultDispatcher) Release() {
} }
func (this *DefaultDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay { func (this *DefaultDispatcher) DispatchToOutbound(meta *proxy.InboundHandlerMeta, destination v2net.Destination) ray.InboundRay {
direct := ray.NewRay() direct := ray.NewRay()
dispatcher := this.ohm.GetDefaultHandler() dispatcher := this.ohm.GetDefaultHandler()
@ -59,7 +60,11 @@ func (this *DefaultDispatcher) DispatchToOutbound(destination v2net.Destination)
} }
} }
if meta.AllowPassiveConnection {
go dispatcher.Dispatch(destination, alloc.NewLocalBuffer(32).Clear(), direct)
} else {
go this.FilterPacketAndDispatch(destination, direct, dispatcher) go this.FilterPacketAndDispatch(destination, direct, dispatcher)
}
return direct return direct
} }

View File

@ -2,6 +2,7 @@ package testing
import ( import (
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/transport/ray" "github.com/v2ray/v2ray-core/transport/ray"
) )
@ -29,7 +30,7 @@ func NewTestPacketDispatcher(handler func(destination v2net.Destination, traffic
} }
} }
func (this *TestPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay { func (this *TestPacketDispatcher) DispatchToOutbound(meta *proxy.InboundHandlerMeta, destination v2net.Destination) ray.InboundRay {
traffic := ray.NewRay() traffic := ray.NewRay()
this.Destination <- destination this.Destination <- destination
go this.Handler(destination, traffic) go this.Handler(destination, traffic)

View File

@ -10,6 +10,7 @@ import (
"github.com/v2ray/v2ray-core/common/dice" "github.com/v2ray/v2ray-core/common/dice"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/transport/internet/udp" "github.com/v2ray/v2ray-core/transport/internet/udp"
"github.com/miekg/dns" "github.com/miekg/dns"
@ -51,7 +52,9 @@ func NewUDPNameServer(address v2net.Destination, dispatcher dispatcher.PacketDis
s := &UDPNameServer{ s := &UDPNameServer{
address: address, address: address,
requests: make(map[uint16]*PendingRequest), requests: make(map[uint16]*PendingRequest),
udpServer: udp.NewUDPServer(dispatcher), udpServer: udp.NewUDPServer(&proxy.InboundHandlerMeta{
AllowPassiveConnection: false,
}, dispatcher),
} }
return s return s
} }

View File

@ -89,7 +89,7 @@ func (this *DokodemoDoor) Start() error {
} }
func (this *DokodemoDoor) ListenUDP() error { func (this *DokodemoDoor) ListenUDP() error {
this.udpServer = udp.NewUDPServer(this.packetDispatcher) this.udpServer = udp.NewUDPServer(this.meta, this.packetDispatcher)
udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPackets) udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPackets)
if err != nil { if err != nil {
log.Error("Dokodemo failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err) log.Error("Dokodemo failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err)
@ -148,7 +148,7 @@ func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
} }
log.Info("Dokodemo: Handling request to ", dest) log.Info("Dokodemo: Handling request to ", dest)
ray := this.packetDispatcher.DispatchToOutbound(dest) ray := this.packetDispatcher.DispatchToOutbound(this.meta, dest)
defer ray.InboundOutput().Release() defer ray.InboundOutput().Release()
var inputFinish, outputFinish sync.Mutex var inputFinish, outputFinish sync.Mutex

View File

@ -140,7 +140,7 @@ func (this *Server) handleConnect(request *http.Request, destination v2net.Desti
} }
response.Write(writer) response.Write(writer)
ray := this.packetDispatcher.DispatchToOutbound(destination) ray := this.packetDispatcher.DispatchToOutbound(this.meta, destination)
this.transport(reader, writer, ray) this.transport(reader, writer, ray)
} }
@ -220,7 +220,7 @@ func (this *Server) handlePlainHTTP(request *http.Request, dest v2net.Destinatio
request.Host = request.URL.Host request.Host = request.URL.Host
StripHopByHopHeaders(request) StripHopByHopHeaders(request)
ray := this.packetDispatcher.DispatchToOutbound(dest) ray := this.packetDispatcher.DispatchToOutbound(this.meta, dest)
defer ray.InboundInput().Close() defer ray.InboundInput().Close()
defer ray.InboundOutput().Release() defer ray.InboundOutput().Release()

View File

@ -19,6 +19,7 @@ type InboundHandlerMeta struct {
Tag string Tag string
Address v2net.Address Address v2net.Address
Port v2net.Port Port v2net.Port
AllowPassiveConnection bool
StreamSettings *internet.StreamSettings StreamSettings *internet.StreamSettings
} }

View File

@ -70,7 +70,7 @@ func (this *Server) Start() error {
this.tcpHub = tcpHub this.tcpHub = tcpHub
if this.config.UDP { if this.config.UDP {
this.udpServer = udp.NewUDPServer(this.packetDispatcher) this.udpServer = udp.NewUDPServer(this.meta, this.packetDispatcher)
udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handlerUDPPayload) udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handlerUDPPayload)
if err != nil { if err != nil {
log.Error("Shadowsocks: Failed to listen UDP on ", this.meta.Address, ":", this.meta.Port, ": ", err) log.Error("Shadowsocks: Failed to listen UDP on ", this.meta.Address, ":", this.meta.Port, ": ", err)
@ -204,7 +204,7 @@ func (this *Server) handleConnection(conn internet.Connection) {
log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "") log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "")
log.Info("Shadowsocks: Tunnelling request to ", dest) log.Info("Shadowsocks: Tunnelling request to ", dest)
ray := this.packetDispatcher.DispatchToOutbound(dest) ray := this.packetDispatcher.DispatchToOutbound(this.meta, dest)
defer ray.InboundOutput().Release() defer ray.InboundOutput().Release()
var writeFinish sync.Mutex var writeFinish sync.Mutex

View File

@ -283,7 +283,7 @@ func (this *Server) handleSocks4(clientAddr string, reader *v2io.BufferedReader,
} }
func (this *Server) transport(reader io.Reader, writer io.Writer, destination v2net.Destination) { func (this *Server) transport(reader io.Reader, writer io.Writer, destination v2net.Destination) {
ray := this.packetDispatcher.DispatchToOutbound(destination) ray := this.packetDispatcher.DispatchToOutbound(this.meta, destination)
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()

View File

@ -9,7 +9,7 @@ import (
) )
func (this *Server) listenUDP() error { func (this *Server) listenUDP() error {
this.udpServer = udp.NewUDPServer(this.packetDispatcher) this.udpServer = udp.NewUDPServer(this.meta, this.packetDispatcher)
udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPayload) udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPayload)
if err != nil { if err != nil {
log.Error("Socks: Failed to listen on udp ", this.meta.Address, ":", this.meta.Port) log.Error("Socks: Failed to listen on udp ", this.meta.Address, ":", this.meta.Port)

View File

@ -7,6 +7,7 @@ import (
"github.com/v2ray/v2ray-core/app/dispatcher" "github.com/v2ray/v2ray-core/app/dispatcher"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
) )
type InboundConnectionHandler struct { type InboundConnectionHandler struct {
@ -30,7 +31,9 @@ func (this *InboundConnectionHandler) Close() {
} }
func (this *InboundConnectionHandler) Communicate(destination v2net.Destination) error { func (this *InboundConnectionHandler) Communicate(destination v2net.Destination) error {
ray := this.PacketDispatcher.DispatchToOutbound(destination) ray := this.PacketDispatcher.DispatchToOutbound(&proxy.InboundHandlerMeta{
AllowPassiveConnection: false,
}, destination)
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()

View File

@ -163,7 +163,7 @@ func (this *VMessInboundHandler) HandleConnection(connection internet.Connection
connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse)) connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
ray := this.packetDispatcher.DispatchToOutbound(request.Destination()) ray := this.packetDispatcher.DispatchToOutbound(this.meta, request.Destination())
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()
defer input.Close() defer input.Close()

View File

@ -15,6 +15,7 @@ type InboundConnectionConfig struct {
StreamSettings *internet.StreamSettings StreamSettings *internet.StreamSettings
Protocol string Protocol string
Settings []byte Settings []byte
AllowPassiveConnection bool
} }
type OutboundConnectionConfig struct { type OutboundConnectionConfig struct {
@ -50,6 +51,7 @@ type InboundDetourConfig struct {
Allocation *InboundDetourAllocationConfig Allocation *InboundDetourAllocationConfig
StreamSettings *internet.StreamSettings StreamSettings *internet.StreamSettings
Settings []byte Settings []byte
AllowPassiveConnection bool
} }
type OutboundDetourConfig struct { type OutboundDetourConfig struct {

View File

@ -71,6 +71,7 @@ func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error {
Protocol string `json:"protocol"` Protocol string `json:"protocol"`
StreamSetting *internet.StreamSettings `json:"streamSettings"` StreamSetting *internet.StreamSettings `json:"streamSettings"`
Settings json.RawMessage `json:"settings"` Settings json.RawMessage `json:"settings"`
AllowPassive bool `json:"allowPassive"`
} }
jsonConfig := new(JsonConfig) jsonConfig := new(JsonConfig)
@ -91,6 +92,7 @@ func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error {
this.Protocol = jsonConfig.Protocol this.Protocol = jsonConfig.Protocol
this.Settings = jsonConfig.Settings this.Settings = jsonConfig.Settings
this.AllowPassiveConnection = jsonConfig.AllowPassive
return nil return nil
} }
@ -186,6 +188,7 @@ func (this *InboundDetourConfig) UnmarshalJSON(data []byte) error {
Tag string `json:"tag"` Tag string `json:"tag"`
Allocation *InboundDetourAllocationConfig `json:"allocate"` Allocation *InboundDetourAllocationConfig `json:"allocate"`
StreamSetting *internet.StreamSettings `json:"streamSettings"` StreamSetting *internet.StreamSettings `json:"streamSettings"`
AllowPassive bool `json:"allowPassive"`
} }
jsonConfig := new(JsonInboundDetourConfig) jsonConfig := new(JsonInboundDetourConfig)
if err := json.Unmarshal(data, jsonConfig); err != nil { if err := json.Unmarshal(data, jsonConfig); err != nil {
@ -216,6 +219,7 @@ func (this *InboundDetourConfig) UnmarshalJSON(data []byte) error {
if jsonConfig.StreamSetting != nil { if jsonConfig.StreamSetting != nil {
this.StreamSettings = jsonConfig.StreamSetting this.StreamSettings = jsonConfig.StreamSetting
} }
this.AllowPassiveConnection = jsonConfig.AllowPassive
return nil return nil
} }

View File

@ -29,7 +29,9 @@ func NewInboundDetourHandlerAlways(space app.Space, config *InboundDetourConfig)
Address: config.ListenOn, Address: config.ListenOn,
Port: i, Port: i,
Tag: config.Tag, Tag: config.Tag,
StreamSettings: config.StreamSettings}) StreamSettings: config.StreamSettings,
AllowPassiveConnection: config.AllowPassiveConnection,
})
if err != nil { if err != nil {
log.Error("Failed to create inbound connection handler: ", err) log.Error("Failed to create inbound connection handler: ", err)
return nil, err return nil, err

View File

@ -36,7 +36,9 @@ func NewInboundDetourHandlerDynamic(space app.Space, config *InboundDetourConfig
Address: config.ListenOn, Address: config.ListenOn,
Port: 0, Port: 0,
Tag: config.Tag, Tag: config.Tag,
StreamSettings: config.StreamSettings}) StreamSettings: config.StreamSettings,
AllowPassiveConnection: config.AllowPassiveConnection,
})
if err != nil { if err != nil {
log.Error("Point: Failed to create inbound connection handler: ", err) log.Error("Point: Failed to create inbound connection handler: ", err)
return nil, err return nil, err

View File

@ -96,7 +96,9 @@ func NewPoint(pConfig *Config) (*Point, error) {
Tag: "system.inbound", Tag: "system.inbound",
Address: pConfig.InboundConfig.ListenOn, Address: pConfig.InboundConfig.ListenOn,
Port: vpoint.port, Port: vpoint.port,
StreamSettings: pConfig.InboundConfig.StreamSettings}) StreamSettings: pConfig.InboundConfig.StreamSettings,
AllowPassiveConnection: pConfig.InboundConfig.AllowPassiveConnection,
})
if err != nil { if err != nil {
log.Error("Failed to create inbound connection handler: ", err) log.Error("Failed to create inbound connection handler: ", err)
return nil, err return nil, err

View File

@ -8,6 +8,7 @@ import (
"github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/transport/ray" "github.com/v2ray/v2ray-core/transport/ray"
) )
@ -95,12 +96,14 @@ type UDPServer struct {
sync.RWMutex sync.RWMutex
conns map[string]*TimedInboundRay conns map[string]*TimedInboundRay
packetDispatcher dispatcher.PacketDispatcher packetDispatcher dispatcher.PacketDispatcher
meta *proxy.InboundHandlerMeta
} }
func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer { func NewUDPServer(meta *proxy.InboundHandlerMeta, packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
return &UDPServer{ return &UDPServer{
conns: make(map[string]*TimedInboundRay), conns: make(map[string]*TimedInboundRay),
packetDispatcher: packetDispatcher, packetDispatcher: packetDispatcher,
meta: meta,
} }
} }
@ -137,7 +140,7 @@ func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Dest
} }
log.Info("UDP Server: establishing new connection for ", destString) log.Info("UDP Server: establishing new connection for ", destString)
inboundRay := this.packetDispatcher.DispatchToOutbound(destination) inboundRay := this.packetDispatcher.DispatchToOutbound(this.meta, destination)
timedInboundRay := NewTimedInboundRay(destString, inboundRay, this) timedInboundRay := NewTimedInboundRay(destString, inboundRay, this)
outputStream := timedInboundRay.InboundInput() outputStream := timedInboundRay.InboundInput()
if outputStream != nil { if outputStream != nil {