diff --git a/common/protocol/headers.go b/common/protocol/headers.go index ab4c64396..2308a2f23 100644 --- a/common/protocol/headers.go +++ b/common/protocol/headers.go @@ -32,6 +32,13 @@ type RequestHeader struct { Port v2net.Port } +func (this *RequestHeader) Destination() v2net.Destination { + if this.Command == RequestCommandUDP { + return v2net.UDPDestination(this.Address, this.Port) + } + return v2net.TCPDestination(this.Address, this.Port) +} + type ResponseCommand interface{} type ResponseHeader struct { diff --git a/common/protocol/raw/commands.go b/common/protocol/raw/commands.go index e4e2fb2cb..a7a6aa9fd 100644 --- a/common/protocol/raw/commands.go +++ b/common/protocol/raw/commands.go @@ -19,6 +19,10 @@ var ( ) func MarshalCommand(command interface{}, writer io.Writer) error { + if command == nil { + return ErrorUnknownCommand + } + var cmdId byte var factory CommandFactory switch command.(type) { @@ -29,7 +33,7 @@ func MarshalCommand(command interface{}, writer io.Writer) error { return ErrorUnknownCommand } - buffer := alloc.NewSmallBuffer() + buffer := alloc.NewSmallBuffer().Clear() err := factory.Marshal(command, buffer) if err != nil { return err diff --git a/common/protocol/raw/commands_test.go b/common/protocol/raw/commands_test.go index d1198319a..84dae43e3 100644 --- a/common/protocol/raw/commands_test.go +++ b/common/protocol/raw/commands_test.go @@ -1,9 +1,9 @@ package raw_test import ( - "bytes" "testing" + "github.com/v2ray/v2ray-core/common/alloc" netassert "github.com/v2ray/v2ray-core/common/net/testing/assert" "github.com/v2ray/v2ray-core/common/protocol" . "github.com/v2ray/v2ray-core/common/protocol/raw" @@ -23,11 +23,11 @@ func TestSwitchAccount(t *testing.T) { ValidMin: 16, } - buffer := bytes.NewBuffer(make([]byte, 0, 1024)) + buffer := alloc.NewBuffer().Clear() err := MarshalCommand(sa, buffer) assert.Error(err).IsNil() - cmd, err := UnmarshalCommand(1, buffer.Bytes()) + cmd, err := UnmarshalCommand(1, buffer.Value[2:]) assert.Error(err).IsNil() sa2, ok := cmd.(*protocol.CommandSwitchAccount) diff --git a/common/protocol/raw/server.go b/common/protocol/raw/server.go index ad3ee409e..35d1bb7f8 100644 --- a/common/protocol/raw/server.go +++ b/common/protocol/raw/server.go @@ -24,6 +24,12 @@ type ServerSession struct { responseWriter io.Writer } +func NewServerSession(validator protocol.UserValidator) *ServerSession { + return &ServerSession{ + userValidator: validator, + } +} + func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.RequestHeader, error) { buffer := alloc.NewSmallBuffer() defer buffer.Release() @@ -134,14 +140,17 @@ func (this *ServerSession) EncodeResponseHeader(header *protocol.ResponseHeader, responseBodyKey := md5.Sum(this.requestBodyKey) responseBodyIV := md5.Sum(this.requestBodyIV) this.responseBodyKey = responseBodyKey[:] - this.requestBodyIV = responseBodyIV[:] + this.responseBodyIV = responseBodyIV[:] aesStream := crypto.NewAesEncryptionStream(this.responseBodyKey, this.responseBodyIV) encryptionWriter := crypto.NewCryptionWriter(aesStream, writer) this.responseWriter = encryptionWriter encryptionWriter.Write([]byte{this.responseHeader, 0x00}) - MarshalCommand(header.Command, encryptionWriter) + err := MarshalCommand(header.Command, encryptionWriter) + if err != nil { + encryptionWriter.Write([]byte{0x00, 0x00}) + } } func (this *ServerSession) EncodeResponseBody(writer io.Writer) io.Writer { diff --git a/proxy/vmess/inbound/command.go b/proxy/vmess/inbound/command.go index 72f31205a..0f234ca42 100644 --- a/proxy/vmess/inbound/command.go +++ b/proxy/vmess/inbound/command.go @@ -1,20 +1,12 @@ package inbound import ( - "hash/fnv" - - "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" + "github.com/v2ray/v2ray-core/common/protocol" "github.com/v2ray/v2ray-core/common/serial" - "github.com/v2ray/v2ray-core/proxy/vmess/command" - "github.com/v2ray/v2ray-core/proxy/vmess/protocol" ) -func (this *VMessInboundHandler) generateCommand(request *protocol.VMessRequest, buffer *alloc.Buffer) { - cmd := byte(0) - commandBytes := alloc.NewSmallBuffer().Clear() - defer commandBytes.Release() - +func (this *VMessInboundHandler) generateCommand(request *protocol.RequestHeader) protocol.ResponseCommand { if this.features != nil && this.features.Detour != nil { tag := this.features.Detour.ToTag @@ -25,29 +17,19 @@ func (this *VMessInboundHandler) generateCommand(request *protocol.VMessRequest, if availableMin > 255 { availableMin = 255 } - cmd = byte(1) + log.Info("VMessIn: Pick detour handler for port ", inboundHandler.Port(), " for ", availableMin, " minutes.") user := inboundHandler.GetUser(request.User.Email) - saCmd := &command.SwitchAccount{ + return &protocol.CommandSwitchAccount{ Port: inboundHandler.Port(), ID: user.ID.UUID(), AlterIds: serial.Uint16Literal(len(user.AlterIDs)), Level: user.Level, ValidMin: byte(availableMin), } - saCmd.Marshal(commandBytes) } } } - if cmd == 0 || commandBytes.Len()+4 > 256 { - buffer.AppendBytes(byte(0), byte(0)) - } else { - buffer.AppendBytes(cmd, byte(commandBytes.Len()+4)) - fnv1hash := fnv.New32a() - fnv1hash.Write(commandBytes.Value) - hashValue := fnv1hash.Sum32() - buffer.AppendBytes(byte(hashValue>>24), byte(hashValue>>16), byte(hashValue>>8), byte(hashValue)) - buffer.Append(commandBytes.Value) - } + return nil } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 6643649c2..cdeb16b6c 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -1,19 +1,16 @@ package inbound import ( - "crypto/md5" - "io" "sync" "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app/dispatcher" "github.com/v2ray/v2ray-core/app/proxyman" - "github.com/v2ray/v2ray-core/common/alloc" - v2crypto "github.com/v2ray/v2ray-core/common/crypto" v2io "github.com/v2ray/v2ray-core/common/io" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" proto "github.com/v2ray/v2ray-core/common/protocol" + raw "github.com/v2ray/v2ray-core/common/protocol/raw" "github.com/v2ray/v2ray-core/common/serial" "github.com/v2ray/v2ray-core/common/uuid" "github.com/v2ray/v2ray-core/proxy" @@ -122,9 +119,11 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { defer connection.Close() connReader := v2net.NewTimeOutReader(16, connection) - requestReader := protocol.NewVMessRequestReader(this.clients) - request, err := requestReader.Read(connReader) + reader := v2io.NewBufferedReader(connReader) + session := raw.NewServerSession(this.clients) + + request, err := session.DecodeRequestHeader(reader) if err != nil { log.Access(connection.RemoteAddr(), serial.StringLiteral(""), log.AccessRejected, serial.StringLiteral(err.Error())) log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err) @@ -142,30 +141,42 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { userSettings := proto.GetUserSettings(request.User.Level) connReader.SetTimeOut(userSettings.PayloadReadTimeout) - go handleInput(request, connReader, input, &readFinish) + reader.SetCached(false) + go func() { + defer close(input) + defer readFinish.Unlock() + bodyReader := session.DecodeRequestBody(reader) + var requestReader v2io.Reader + if request.Option.IsChunkStream() { + requestReader = vmessio.NewAuthChunkReader(bodyReader) + } else { + requestReader = v2io.NewAdaptiveReader(bodyReader) + } + v2io.ReaderToChan(input, requestReader) + }() - responseKey := md5.Sum(request.RequestKey) - responseIV := md5.Sum(request.RequestIV) + writer := v2io.NewBufferedWriter(connection) - aesStream := v2crypto.NewAesEncryptionStream(responseKey[:], responseIV[:]) - responseWriter := v2crypto.NewCryptionWriter(aesStream, connection) + response := &proto.ResponseHeader{ + Command: this.generateCommand(request), + } + + session.EncodeResponseHeader(response, writer) + + bodyWriter := session.EncodeResponseBody(writer) // Optimize for small response packet - buffer := alloc.NewLargeBuffer().Clear() - defer buffer.Release() - buffer.AppendBytes(request.ResponseHeader, byte(0)) - this.generateCommand(request, buffer) - if data, open := <-output; open { - if request.IsChunkStream() { + if request.Option.IsChunkStream() { vmessio.Authenticate(data) } - buffer.Append(data.Value) + bodyWriter.Write(data.Value) data.Release() - responseWriter.Write(buffer.Value) + + writer.SetCached(false) go func(finish *sync.Mutex) { - var writer v2io.Writer = v2io.NewAdaptiveWriter(responseWriter) - if request.IsChunkStream() { + var writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) + if request.Option.IsChunkStream() { writer = vmessio.NewAuthChunkWriter(writer) } v2io.ChanToWriter(writer, output) @@ -178,21 +189,6 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { readFinish.Lock() } -func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) { - defer close(input) - defer finish.Unlock() - - aesStream := v2crypto.NewAesDecryptionStream(request.RequestKey, request.RequestIV) - descriptionReader := v2crypto.NewCryptionReader(aesStream, reader) - var requestReader v2io.Reader - if request.IsChunkStream() { - requestReader = vmessio.NewAuthChunkReader(descriptionReader) - } else { - requestReader = v2io.NewAdaptiveReader(descriptionReader) - } - v2io.ReaderToChan(input, requestReader) -} - func init() { internal.MustRegisterInboundHandlerCreator("vmess", func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {