1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-30 05:56:54 -05:00

Merge pull request #238 from xiaokangwang/websocket

Websocket transport support for v2ray
This commit is contained in:
Darien Raymond 2016-08-15 17:45:25 +02:00 committed by GitHub
commit 667b71aad0
23 changed files with 1139 additions and 6 deletions

View File

@ -15,6 +15,9 @@ const (
// KCPNetwork represents the KCP network.
KCPNetwork = Network("kcp")
// WSNetwork represents the Websocket over HTTP network.
WSNetwork = Network("ws")
)
// Network represents a communication network on internet.

View File

@ -245,7 +245,7 @@ func (this *VMessInboundHandler) HandleConnection(connection internet.Connection
type Factory struct{}
func (this *Factory) StreamCapability() internet.StreamConnectionType {
return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP | internet.StreamConnectionTypeKCP
return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP | internet.StreamConnectionTypeKCP | internet.StreamConnectionTypeWebSocket
}
func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {

View File

@ -160,7 +160,7 @@ func (this *VMessOutboundHandler) handleResponse(session *encoding.ClientSession
type Factory struct{}
func (this *Factory) StreamCapability() internet.StreamConnectionType {
return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP | internet.StreamConnectionTypeKCP
return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP | internet.StreamConnectionTypeKCP | internet.StreamConnectionTypeWebSocket
}
func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {

View File

@ -26,6 +26,7 @@ import (
_ "github.com/v2ray/v2ray-core/transport/internet/kcp"
_ "github.com/v2ray/v2ray-core/transport/internet/tcp"
_ "github.com/v2ray/v2ray-core/transport/internet/udp"
_ "github.com/v2ray/v2ray-core/transport/internet/ws"
_ "github.com/v2ray/v2ray-core/transport/internet/authenticators/noop"
_ "github.com/v2ray/v2ray-core/transport/internet/authenticators/srtp"

15
testing/tls/cert.pem Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICSTCCAfOgAwIBAgIJAKvQIEezrxBNMA0GCSqGSIb3DQEBCwUAMH8xCzAJBgNV
BAYTAlYyMRAwDgYDVQQIDAdURVNUSU5HMRAwDgYDVQQHDAdURVNUSU5HMRAwDgYD
VQQKDAdURVNUSU5HMRAwDgYDVQQLDAdURVNUSU5HMRAwDgYDVQQDDAdURVNUSU5H
MRYwFAYJKoZIhvcNAQkBFgdURVNUSU5HMCAXDTE2MDgxNTEyNTAwNFoYDzY0ODcw
NTA3MTI1MDA0WjB/MQswCQYDVQQGEwJWMjEQMA4GA1UECAwHVEVTVElORzEQMA4G
A1UEBwwHVEVTVElORzEQMA4GA1UECgwHVEVTVElORzEQMA4GA1UECwwHVEVTVElO
RzEQMA4GA1UEAwwHVEVTVElORzEWMBQGCSqGSIb3DQEJARYHVEVTVElORzBcMA0G
CSqGSIb3DQEBAQUAA0sAMEgCQQDFGeGTGepVDwgLm5rFx8khAhbod6g3Xg7vU3M9
lzowBeAOS6bpN8lnBEXo3U2brxB+okbRhNuSj3VQ4raX0iL1AgMBAAGjUDBOMB0G
A1UdDgQWBBRE81DrJv6nBXAF3JP4a3LTtwkp8TAfBgNVHSMEGDAWgBRE81DrJv6n
BXAF3JP4a3LTtwkp8TAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA0EAkGX6
sQvwHqNOdwise45dU8NvXwZsoqSQ2tdxrkB+SnKqEsMnRh/yPCSgzFkQVt53sYuf
HK8gD/wifGC5z39YlQ==
-----END CERTIFICATE-----

10
testing/tls/key.pem Normal file
View File

@ -0,0 +1,10 @@
-----BEGIN PRIVATE KEY-----
MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAxRnhkxnqVQ8IC5ua
xcfJIQIW6HeoN14O71NzPZc6MAXgDkum6TfJZwRF6N1Nm68QfqJG0YTbko91UOK2
l9Ii9QIDAQABAkEAgumaywKWgyJ1vIgAt8bnzxW9M3BueT/u+YTa8Ril3EiOtxDl
/aRtVJ/62r64Ymtq8BvYcEiopFKrUUKPaTfIrQIhAPkd9nDq7B4WepF8+pB0CyR0
dpT0imCXooN4+utosrdDAiEAyowAwGcYULWiALgSi78gt7fRgp0GwTf80evFy/k0
DWcCIQC9+IxrVarT0v6LHgyRxfyNQ0b+lnFD8b6bldF7Xa8TswIgA/Ptg9O/Prv8
uGTfP8jwG4XD2fe0jQrJrVMbnhpz8JsCIHkkbC3ez+iPieasr8a+zEpreE8NjrEV
xs4xp6WZPsGp
-----END PRIVATE KEY-----

View File

@ -3,12 +3,14 @@ package transport
import (
"github.com/v2ray/v2ray-core/transport/internet/kcp"
"github.com/v2ray/v2ray-core/transport/internet/tcp"
"github.com/v2ray/v2ray-core/transport/internet/ws"
)
// Config for V2Ray transport layer.
type Config struct {
tcpConfig *tcp.Config
kcpConfig kcp.Config
wsConfig *ws.Config
}
// Apply applies this Config.
@ -17,5 +19,8 @@ func (this *Config) Apply() error {
this.tcpConfig.Apply()
}
this.kcpConfig.Apply()
if this.wsConfig != nil {
this.wsConfig.Apply()
}
return nil
}

View File

@ -7,12 +7,14 @@ import (
"github.com/v2ray/v2ray-core/transport/internet/kcp"
"github.com/v2ray/v2ray-core/transport/internet/tcp"
"github.com/v2ray/v2ray-core/transport/internet/ws"
)
func (this *Config) UnmarshalJSON(data []byte) error {
type JsonConfig struct {
TCPConfig *tcp.Config `json:"tcpSettings"`
KCPConfig kcp.Config `json:"kcpSettings"`
WSConfig *ws.Config `json:"wsSettings"`
}
jsonConfig := &JsonConfig{
KCPConfig: kcp.DefaultConfig(),
@ -22,6 +24,6 @@ func (this *Config) UnmarshalJSON(data []byte) error {
}
this.tcpConfig = jsonConfig.TCPConfig
this.kcpConfig = jsonConfig.KCPConfig
this.wsConfig = jsonConfig.WSConfig
return nil
}

View File

@ -15,9 +15,10 @@ type Reusable interface {
type StreamConnectionType int
const (
StreamConnectionTypeRawTCP StreamConnectionType = 1
StreamConnectionTypeTCP StreamConnectionType = 2
StreamConnectionTypeKCP StreamConnectionType = 4
StreamConnectionTypeRawTCP StreamConnectionType = 1
StreamConnectionTypeTCP StreamConnectionType = 2
StreamConnectionTypeKCP StreamConnectionType = 4
StreamConnectionTypeWebSocket StreamConnectionType = 8
)
type StreamSecurityType int

View File

@ -50,6 +50,9 @@ func (this *StreamSettings) UnmarshalJSON(data []byte) error {
if jsonConfig.Network.HasNetwork(v2net.KCPNetwork) {
this.Type |= StreamConnectionTypeKCP
}
if jsonConfig.Network.HasNetwork(v2net.WSNetwork) {
this.Type |= StreamConnectionTypeWebSocket
}
if jsonConfig.Network.HasNetwork(v2net.TCPNetwork) {
this.Type |= StreamConnectionTypeTCP
}

View File

@ -20,9 +20,11 @@ var (
KCPDialer Dialer
RawTCPDialer Dialer
UDPDialer Dialer
WSDialer Dialer
)
func Dial(src v2net.Address, dest v2net.Destination, settings *StreamSettings) (Connection, error) {
var connection Connection
var err error
if dest.IsTCP() {
@ -31,6 +33,15 @@ func Dial(src v2net.Address, dest v2net.Destination, settings *StreamSettings) (
connection, err = TCPDialer(src, dest)
case settings.IsCapableOf(StreamConnectionTypeKCP):
connection, err = KCPDialer(src, dest)
case settings.IsCapableOf(StreamConnectionTypeWebSocket):
connection, err = WSDialer(src, dest)
/*Warning: Hours wasted: the following item must be last one
internet.StreamConnectionType have a default value of 1,
so the following attempt will catch all.
*/
case settings.IsCapableOf(StreamConnectionTypeRawTCP):
connection, err = RawTCPDialer(src, dest)
default:
@ -39,6 +50,7 @@ func Dial(src v2net.Address, dest v2net.Destination, settings *StreamSettings) (
if err != nil {
return nil, err
}
if settings.Security == StreamSecurityTypeNone {
return connection, nil
}

View File

@ -17,6 +17,7 @@ var (
KCPListenFunc ListenFunc
TCPListenFunc ListenFunc
RawTCPListenFunc ListenFunc
WSListenFunc ListenFunc
)
type ListenFunc func(address v2net.Address, port v2net.Port) (Listener, error)
@ -42,6 +43,8 @@ func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandle
listener, err = TCPListenFunc(address, port)
case settings.IsCapableOf(StreamConnectionTypeKCP):
listener, err = KCPListenFunc(address, port)
case settings.IsCapableOf(StreamConnectionTypeWebSocket):
listener, err = WSListenFunc(address, port)
case settings.IsCapableOf(StreamConnectionTypeRawTCP):
listener, err = RawTCPListenFunc(address, port)
default:

View File

@ -0,0 +1,22 @@
package ws
type Config struct {
ConnectionReuse bool
Path string
Pto string
Cert string
PrivKey string
DeveloperInsecureSkipVerify bool
}
func (this *Config) Apply() {
effectiveConfig = this
}
var (
effectiveConfig = &Config{
ConnectionReuse: true,
Path: "",
Pto: "",
}
)

View File

@ -0,0 +1,30 @@
package ws
import (
"encoding/json"
)
func (this *Config) UnmarshalJSON(data []byte) error {
type JsonConfig struct {
ConnectionReuse bool `json:"connectionReuse"`
Path string `json:"Path"`
Pto string `json:"Pto"`
Cert string `json:"Cert"`
PrivKey string `json:"PrivKey"`
}
jsonConfig := &JsonConfig{
ConnectionReuse: true,
Path: "",
Pto: "",
}
if err := json.Unmarshal(data, jsonConfig); err != nil {
return err
}
this.ConnectionReuse = jsonConfig.ConnectionReuse
this.Path = jsonConfig.Path
this.Pto = jsonConfig.Pto
this.PrivKey = jsonConfig.PrivKey
this.Cert = jsonConfig.Cert
this.DeveloperInsecureSkipVerify = false
return nil
}

View File

@ -0,0 +1,99 @@
package ws
import (
"errors"
"io"
"net"
"time"
)
var (
ErrInvalidConn = errors.New("Invalid Connection.")
)
type ConnectionManager interface {
Recycle(string, *wsconn)
}
type Connection struct {
dest string
conn *wsconn
listener ConnectionManager
reusable bool
}
func NewConnection(dest string, conn *wsconn, manager ConnectionManager) *Connection {
return &Connection{
dest: dest,
conn: conn,
listener: manager,
reusable: effectiveConfig.ConnectionReuse,
}
}
func (this *Connection) Read(b []byte) (int, error) {
if this == nil || this.conn == nil {
return 0, io.EOF
}
return this.conn.Read(b)
}
func (this *Connection) Write(b []byte) (int, error) {
if this == nil || this.conn == nil {
return 0, io.ErrClosedPipe
}
return this.conn.Write(b)
}
func (this *Connection) Close() error {
if this == nil || this.conn == nil {
return io.ErrClosedPipe
}
if this.Reusable() {
this.listener.Recycle(this.dest, this.conn)
return nil
}
err := this.conn.Close()
this.conn = nil
return err
}
func (this *Connection) LocalAddr() net.Addr {
return this.conn.LocalAddr()
}
func (this *Connection) RemoteAddr() net.Addr {
return this.conn.RemoteAddr()
}
func (this *Connection) SetDeadline(t time.Time) error {
return this.conn.SetDeadline(t)
}
func (this *Connection) SetReadDeadline(t time.Time) error {
return this.conn.SetReadDeadline(t)
}
func (this *Connection) SetWriteDeadline(t time.Time) error {
return this.conn.SetWriteDeadline(t)
}
func (this *Connection) SetReusable(reusable bool) {
if !effectiveConfig.ConnectionReuse {
return
}
this.reusable = reusable
}
func (this *Connection) Reusable() bool {
return this.reusable
}
func (this *Connection) SysFd() (int, error) {
return getSysFd(this.conn)
}
func getSysFd(conn net.Conn) (int, error) {
return 0, ErrInvalidConn
}

View File

@ -0,0 +1,114 @@
package ws
import (
"net"
"sync"
"time"
"github.com/v2ray/v2ray-core/common/log"
"github.com/v2ray/v2ray-core/common/signal"
)
type AwaitingConnection struct {
conn *wsconn
expire time.Time
}
func (this *AwaitingConnection) Expired() bool {
return this.expire.Before(time.Now())
}
type ConnectionCache struct {
sync.Mutex
cache map[string][]*AwaitingConnection
cleanupOnce signal.Once
}
func NewConnectionCache() *ConnectionCache {
return &ConnectionCache{
cache: make(map[string][]*AwaitingConnection),
}
}
func (this *ConnectionCache) Cleanup() {
defer this.cleanupOnce.Reset()
for len(this.cache) > 0 {
time.Sleep(time.Second * 7)
this.Lock()
for key, value := range this.cache {
size := len(value)
changed := false
for i := 0; i < size; {
if value[i].Expired() {
value[i].conn.Close()
value[i] = value[size-1]
size--
changed = true
} else {
i++
}
}
if changed {
for i := size; i < len(value); i++ {
value[i] = nil
}
value = value[:size]
this.cache[key] = value
}
}
this.Unlock()
}
}
func (this *ConnectionCache) Recycle(dest string, conn *wsconn) {
this.Lock()
defer this.Unlock()
aconn := &AwaitingConnection{
conn: conn,
expire: time.Now().Add(time.Second * 7),
}
var list []*AwaitingConnection
if v, found := this.cache[dest]; found {
v = append(v, aconn)
list = v
} else {
list = []*AwaitingConnection{aconn}
}
this.cache[dest] = list
go this.cleanupOnce.Do(this.Cleanup)
}
func FindFirstValid(list []*AwaitingConnection) int {
for idx, conn := range list {
if !conn.Expired() && !conn.conn.connClosing {
return idx
}
go conn.conn.Close()
}
return -1
}
func (this *ConnectionCache) Get(dest string) net.Conn {
this.Lock()
defer this.Unlock()
list, found := this.cache[dest]
if !found {
return nil
}
firstValid := FindFirstValid(list)
if firstValid == -1 {
delete(this.cache, dest)
return nil
}
res := list[firstValid].conn
list = list[firstValid+1:]
this.cache[dest] = list
log.Debug("WS:Conn Cache used.")
return res
}

View File

@ -0,0 +1,139 @@
package ws
import (
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"github.com/gorilla/websocket"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport/internet"
)
var (
globalCache = NewConnectionCache()
)
func Dial(src v2net.Address, dest v2net.Destination) (internet.Connection, error) {
log.Info("Dailing WS to ", dest)
if src == nil {
src = v2net.AnyIP
}
id := src.String() + "-" + dest.NetAddr()
var conn *wsconn
if dest.IsTCP() && effectiveConfig.ConnectionReuse {
connt := globalCache.Get(id)
if connt != nil {
conn = connt.(*wsconn)
}
}
if conn == nil {
var err error
conn, err = wsDial(src, dest)
if err != nil {
log.Warning("WS Dial failed:" + err.Error())
return nil, err
}
}
return NewConnection(id, conn, globalCache), nil
}
func init() {
internet.WSDialer = Dial
}
func wsDial(src v2net.Address, dest v2net.Destination) (*wsconn, error) {
commonDial := func(network, addr string) (net.Conn, error) {
return internet.DialToDest(src, dest)
}
tlsconf := &tls.Config{ServerName: dest.Address().Domain(), InsecureSkipVerify: effectiveConfig.DeveloperInsecureSkipVerify}
dialer := websocket.Dialer{NetDial: commonDial, ReadBufferSize: 65536, WriteBufferSize: 65536, TLSClientConfig: tlsconf}
effpto := calcPto(dest)
uri := func(dst v2net.Destination, pto string, path string) string {
return fmt.Sprintf("%v://%v/%v", pto, dst.NetAddr(), path)
}(dest, effpto, effectiveConfig.Path)
conn, resp, err := dialer.Dial(uri, nil)
if err != nil {
if resp != nil {
reason, reasonerr := ioutil.ReadAll(resp.Body)
log.Info(string(reason), reasonerr)
}
return nil, err
}
return func() internet.Connection {
connv2ray := &wsconn{wsc: conn, connClosing: false}
connv2ray.setup()
return connv2ray
}().(*wsconn), nil
}
func calcPto(dst v2net.Destination) string {
if effectiveConfig.Pto != "" {
return effectiveConfig.Pto
}
switch dst.Port().Value() {
/*
Since the value is not given explicitly,
We are guessing it now.
HTTP Port:
80
8080
8880
2052
2082
2086
2095
HTTPS Port:
443
2053
2083
2087
2096
8443
if the port you are using is not well-known,
specify it to avoid this process.
We will return "CRASH"turn "unknown" if we can't guess it, cause Dial to fail.
*/
case 80:
fallthrough
case 8080:
fallthrough
case 8880:
fallthrough
case 2052:
fallthrough
case 2082:
fallthrough
case 2086:
fallthrough
case 2095:
return "ws"
case 443:
fallthrough
case 2053:
fallthrough
case 2083:
fallthrough
case 2087:
fallthrough
case 2096:
fallthrough
case 8443:
return "wss"
default:
return "unknown"
}
}

View File

@ -0,0 +1,185 @@
package ws
import (
"errors"
"net"
"net/http"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport/internet"
)
var (
ErrClosedListener = errors.New("Listener is closed.")
)
type ConnectionWithError struct {
conn net.Conn
err error
}
type WSListener struct {
sync.Mutex
acccepting bool
awaitingConns chan *ConnectionWithError
listener *StoppableListener
}
func ListenWS(address v2net.Address, port v2net.Port) (internet.Listener, error) {
l := &WSListener{
acccepting: true,
awaitingConns: make(chan *ConnectionWithError, 32),
}
err := l.listenws(address, port)
return l, err
}
func (wsl *WSListener) listenws(address v2net.Address, port v2net.Port) error {
http.HandleFunc("/"+effectiveConfig.Path, func(w http.ResponseWriter, r *http.Request) {
log.Warning("WS:WSListener->listenws->(HandleFunc,lambda 2)! Accepting websocket")
con, err := wsl.converttovws(w, r)
if err != nil {
log.Warning("WS:WSListener->listenws->(HandleFunc,lambda 2)!" + err.Error())
return
}
select {
case wsl.awaitingConns <- &ConnectionWithError{
conn: con,
err: err,
}:
log.Warning("WS:WSListener->listenws->(HandleFunc,lambda 2)! transferd websocket")
default:
if con != nil {
con.Close()
}
}
return
})
errchan := make(chan error)
listenerfunc := func() error {
ol, err := net.Listen("tcp", address.String()+":"+strconv.Itoa(int(port.Value())))
if err != nil {
return err
}
wsl.listener, err = NewStoppableListener(ol)
if err != nil {
return err
}
return http.Serve(wsl.listener, nil)
}
if effectiveConfig.Pto == "wss" {
listenerfunc = func() error {
var err error
wsl.listener, err = getstopableTLSlistener(effectiveConfig.Cert, effectiveConfig.PrivKey, address.String()+":"+strconv.Itoa(int(port.Value())))
if err != nil {
return err
}
return http.Serve(wsl.listener, nil)
}
}
go func() {
err := listenerfunc()
errchan <- err
return
}()
var err error
select {
case err = <-errchan:
case <-time.After(time.Second * 2):
//Should this listen fail after 2 sec, it could gone untracked.
}
if err != nil {
log.Error("WS:WSListener->listenws->ListenAndServe!" + err.Error())
}
return err
}
func (wsl *WSListener) converttovws(w http.ResponseWriter, r *http.Request) (*wsconn, error) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 65536,
WriteBufferSize: 65536,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return nil, err
}
wrapedConn := &wsconn{wsc: conn, connClosing: false}
wrapedConn.setup()
return wrapedConn, nil
}
func (this *WSListener) Accept() (internet.Connection, error) {
for this.acccepting {
select {
case connErr, open := <-this.awaitingConns:
log.Info("WSListener: conn accepted")
if !open {
return nil, ErrClosedListener
}
if connErr.err != nil {
return nil, connErr.err
}
return NewConnection("", connErr.conn.(*wsconn), this), nil
case <-time.After(time.Second * 2):
}
}
return nil, ErrClosedListener
}
func (this *WSListener) Recycle(dest string, conn *wsconn) {
this.Lock()
defer this.Unlock()
if !this.acccepting {
return
}
select {
case this.awaitingConns <- &ConnectionWithError{conn: conn}:
default:
conn.Close()
}
}
func (this *WSListener) Addr() net.Addr {
return nil
}
func (this *WSListener) Close() error {
this.Lock()
defer this.Unlock()
this.acccepting = false
this.listener.Stop()
close(this.awaitingConns)
for connErr := range this.awaitingConns {
if connErr.conn != nil {
go connErr.conn.Close()
}
}
return nil
}
func init() {
internet.WSListenFunc = ListenWS
}

View File

@ -0,0 +1,29 @@
package ws
import (
"errors"
"net"
)
type StoppableListener struct {
net.Listener //Wrapped listener
}
func NewStoppableListener(l net.Listener) (*StoppableListener, error) {
retval := &StoppableListener{}
retval.Listener = l
return retval, nil
}
var StoppedError = errors.New("Listener stopped")
func (sl *StoppableListener) Accept() (net.Conn, error) {
newConn, err := sl.Listener.Accept()
return newConn, err
}
func (sl *StoppableListener) Stop() {
sl.Listener.Close()
}

View File

@ -0,0 +1,18 @@
package ws
import "crypto/tls"
func getstopableTLSlistener(cert, key, listenaddr string) (*StoppableListener, error) {
cer, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, err
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
ln, err := tls.Listen("tcp", listenaddr, config)
if err != nil {
return nil, err
}
lns, err := NewStoppableListener(ln)
return lns, err
}

View File

@ -0,0 +1,33 @@
/*Package ws implements Websocket transport
Websocket transport implements a HTTP(S) compliable, surveillance proof transport method with plausible deniability.
To configure such a listener, set streamSettings to be ws. A http(s) listener will be listening at the port you have configured.
There is additional configure can be made at transport configure.
"wsSettings":{
"Path":"ws", // the path our ws handler bind
"Pto": "wss/ws", // the transport protocol we are using ws or wss(listen ws with tls)
"Cert":"cert.pem", // if you have configured to use wss, configure your cert here
"PrivKey":"priv.pem" //if you have configured to use wss, configure your privatekey here
}
To configure such a Dialer, set streamSettings to be ws.
There is additional configure can be made at transport configure.
"wsSettings":{
"Path":"ws", // the path our ws handler bind
"Pto": "wss/ws", // the transport protocol we are using ws or wss(listen ws with tls)
}
It is worth noting that accepting a non-valid cert is not supported as a self-signed or invalid cert can be a sign of a website that is not correctly configured and lead to additional investigation.
This transport was disscussed at
https://github.com/v2ray/v2ray-core/issues/224
*/
package ws

View File

@ -0,0 +1,206 @@
package ws_test
import (
"testing"
"time"
"github.com/v2ray/v2ray-core/testing/assert"
. "github.com/v2ray/v2ray-core/transport/internet/ws"
v2net "github.com/v2ray/v2ray-core/common/net"
)
func Test_Connect_ws(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "ws", Path: ""}).Apply()
conn, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("echo.websocket.org"), 80))
assert.Error(err).IsNil()
conn.Write([]byte("echo"))
s := make(chan int)
go func() {
buf := make([]byte, 4)
conn.Read(buf)
str := string(buf)
if str != "echo" {
assert.Fail("Data mismatch")
}
s <- 0
}()
<-s
conn.Close()
}
func Test_Connect_wss(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "wss", Path: ""}).Apply()
conn, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("echo.websocket.org"), 443))
assert.Error(err).IsNil()
conn.Write([]byte("echo"))
s := make(chan int)
go func() {
buf := make([]byte, 4)
conn.Read(buf)
str := string(buf)
if str != "echo" {
assert.Fail("Data mismatch")
}
s <- 0
}()
<-s
conn.Close()
}
func Test_Connect_wss_1_nil(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "wss", Path: ""}).Apply()
conn, err := Dial(nil, v2net.TCPDestination(v2net.DomainAddress("echo.websocket.org"), 443))
assert.Error(err).IsNil()
conn.Write([]byte("echo"))
s := make(chan int)
go func() {
buf := make([]byte, 4)
conn.Read(buf)
str := string(buf)
if str != "echo" {
assert.Fail("Data mismatch")
}
s <- 0
}()
<-s
conn.Close()
}
func Test_Connect_ws_guess(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "", Path: ""}).Apply()
conn, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("echo.websocket.org"), 80))
assert.Error(err).IsNil()
conn.Write([]byte("echo"))
s := make(chan int)
go func() {
buf := make([]byte, 4)
conn.Read(buf)
str := string(buf)
if str != "echo" {
assert.Fail("Data mismatch")
}
s <- 0
}()
<-s
conn.Close()
}
func Test_Connect_wss_guess(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "", Path: ""}).Apply()
conn, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("echo.websocket.org"), 443))
assert.Error(err).IsNil()
conn.Write([]byte("echo"))
s := make(chan int)
go func() {
buf := make([]byte, 4)
conn.Read(buf)
str := string(buf)
if str != "echo" {
assert.Fail("Data mismatch")
}
s <- 0
}()
<-s
conn.Close()
}
func Test_Connect_wss_guess_fail(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "", Path: ""}).Apply()
_, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("static.kkdev.org"), 443))
assert.Error(err).IsNotNil()
}
func Test_Connect_wss_guess_fail_port(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "", Path: ""}).Apply()
_, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("static.kkdev.org"), 179))
assert.Error(err).IsNotNil()
}
func Test_Connect_wss_guess_reuse(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "", Path: "", ConnectionReuse: true}).Apply()
i := 3
for i != 0 {
conn, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("echo.websocket.org"), 443))
assert.Error(err).IsNil()
conn.Write([]byte("echo"))
s := make(chan int)
go func() {
buf := make([]byte, 4)
conn.Read(buf)
str := string(buf)
if str != "echo" {
assert.Fail("Data mismatch")
}
s <- 0
}()
<-s
if i == 0 {
conn.SetDeadline(time.Now())
conn.SetReadDeadline(time.Now())
conn.SetWriteDeadline(time.Now())
conn.SetReusable(false)
}
conn.Close()
i--
}
}
func Test_listenWSAndDial(t *testing.T) {
assert := assert.On(t)
(&Config{Pto: "ws", Path: "ws"}).Apply()
listen, err := ListenWS(v2net.DomainAddress("localhost"), 13142)
assert.Error(err).IsNil()
go func() {
conn, err := listen.Accept()
assert.Error(err).IsNil()
conn.Close()
conn, err = listen.Accept()
assert.Error(err).IsNil()
conn.Close()
conn, err = listen.Accept()
assert.Error(err).IsNil()
conn.Close()
listen.Close()
}()
conn, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("localhost"), 13142))
assert.Error(err).IsNil()
conn.Close()
<-time.After(time.Second * 5)
conn, err = Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("localhost"), 13142))
assert.Error(err).IsNil()
conn.Close()
<-time.After(time.Second * 15)
conn, err = Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("localhost"), 13142))
assert.Error(err).IsNil()
conn.Close()
}
func Test_listenWSAndDial_TLS(t *testing.T) {
assert := assert.On(t)
go func() {
<-time.After(time.Second * 5)
assert.Fail("Too slow")
}()
(&Config{Pto: "wss", Path: "wss", ConnectionReuse: true, DeveloperInsecureSkipVerify: true, PrivKey: "./../../../testing/tls/key.pem", Cert: "./../../../testing/tls/cert.pem"}).Apply()
listen, err := ListenWS(v2net.DomainAddress("localhost"), 13143)
assert.Error(err).IsNil()
go func() {
conn, err := listen.Accept()
assert.Error(err).IsNil()
conn.Close()
listen.Close()
}()
conn, err := Dial(v2net.AnyIP, v2net.TCPDestination(v2net.DomainAddress("localhost"), 13143))
assert.Error(err).IsNil()
conn.Close()
}

View File

@ -0,0 +1,203 @@
package ws
import (
"bufio"
"io"
"net"
"sync"
"time"
"github.com/v2ray/v2ray-core/common/log"
"github.com/gorilla/websocket"
)
type wsconn struct {
wsc *websocket.Conn
readBuffer *bufio.Reader
connClosing bool
reusable bool
rlock *sync.Mutex
wlock *sync.Mutex
}
func (ws *wsconn) Read(b []byte) (n int, err error) {
ws.rlock.Lock()
n, err = ws.read(b)
ws.rlock.Unlock()
return n, err
}
func (ws *wsconn) read(b []byte) (n int, err error) {
if ws.connClosing {
return 0, io.EOF
}
n, err = ws.readNext(b)
return n, err
}
func (ws *wsconn) getNewReadBuffer() error {
_, r, err := ws.wsc.NextReader()
if err != nil {
log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
ws.connClosing = true
ws.Close()
return err
}
ws.readBuffer = bufio.NewReader(r)
return nil
}
func (ws *wsconn) readNext(b []byte) (n int, err error) {
if ws.readBuffer == nil {
err = ws.getNewReadBuffer()
if err != nil {
return 0, err
}
}
n, err = ws.readBuffer.Read(b)
if err == nil {
return n, err
}
if err == io.EOF {
ws.readBuffer = nil
if n == 0 {
return ws.readNext(b)
}
return n, nil
}
return n, err
}
func (ws *wsconn) Write(b []byte) (n int, err error) {
ws.wlock.Lock()
if ws.connClosing {
return 0, io.EOF
}
n, err = ws.write(b)
ws.wlock.Unlock()
return n, err
}
func (ws *wsconn) write(b []byte) (n int, err error) {
wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
if err != nil {
log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
ws.connClosing = true
ws.Close()
return 0, err
}
n, err = wr.Write(b)
if err != nil {
return 0, err
}
err = wr.Close()
if err != nil {
return 0, err
}
return n, err
}
func (ws *wsconn) Close() error {
ws.connClosing = true
ws.wlock.Lock()
ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
ws.wlock.Unlock()
err := ws.wsc.Close()
return err
}
func (ws *wsconn) LocalAddr() net.Addr {
return ws.wsc.LocalAddr()
}
func (ws *wsconn) RemoteAddr() net.Addr {
return ws.wsc.RemoteAddr()
}
func (ws *wsconn) SetDeadline(t time.Time) error {
return func() error {
errr := ws.SetReadDeadline(t)
errw := ws.SetWriteDeadline(t)
if errr == nil || errw == nil {
return nil
}
if errr != nil {
return errr
}
return errw
}()
}
func (ws *wsconn) SetReadDeadline(t time.Time) error {
return ws.wsc.SetReadDeadline(t)
}
func (ws *wsconn) SetWriteDeadline(t time.Time) error {
return ws.wsc.SetWriteDeadline(t)
}
func (ws *wsconn) setup() {
ws.connClosing = false
/*
https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
Both Read and write access are both exclusive.
And in both case it will need a lock.
*/
ws.rlock = &sync.Mutex{}
ws.wlock = &sync.Mutex{}
ws.pingPong()
}
func (ws *wsconn) Reusable() bool {
return ws.reusable && !ws.connClosing
}
func (ws *wsconn) SetReusable(reusable bool) {
if !effectiveConfig.ConnectionReuse {
return
}
ws.reusable = reusable
}
func (ws *wsconn) pingPong() {
pongRcv := make(chan int, 0)
ws.wsc.SetPongHandler(func(data string) error {
pongRcv <- 0
return nil
})
go func() {
for !ws.connClosing {
ws.wlock.Lock()
ws.wsc.WriteMessage(websocket.PingMessage, nil)
ws.wlock.Unlock()
tick := time.After(time.Second * 3)
select {
case <-pongRcv:
break
case <-tick:
if !ws.connClosing {
log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
}
ws.Close()
}
<-time.After(time.Second * 27)
}
return
}()
}