From 72fb5a256c7f39ee9681c7f102a87cf6f047cc8d Mon Sep 17 00:00:00 2001 From: v2ray Date: Thu, 2 Jun 2016 21:34:25 +0200 Subject: [PATCH] send reuse option in header --- common/protocol/headers.go | 34 +++++++++++++++++++++++++++++--- common/protocol/headers_test.go | 34 ++++++++++++++++++++++++++++++++ common/protocol/raw/client.go | 4 +++- common/protocol/raw/server.go | 2 +- proxy/vmess/inbound/inbound.go | 14 +++++++++---- proxy/vmess/outbound/outbound.go | 20 ++++++++++++++----- 6 files changed, 94 insertions(+), 14 deletions(-) create mode 100644 common/protocol/headers_test.go diff --git a/common/protocol/headers.go b/common/protocol/headers.go index e45a51e70..6ffd6987e 100644 --- a/common/protocol/headers.go +++ b/common/protocol/headers.go @@ -13,13 +13,22 @@ const ( ) const ( - RequestOptionChunkStream = RequestOption(0x01) + RequestOptionChunkStream = RequestOption(0x01) + RequestOptionConnectionReuse = RequestOption(0x02) ) type RequestOption byte -func (this RequestOption) IsChunkStream() bool { - return (this & RequestOptionChunkStream) == RequestOptionChunkStream +func (this RequestOption) Has(option RequestOption) bool { + return (this & option) == option +} + +func (this *RequestOption) Set(option RequestOption) { + *this = (*this | option) +} + +func (this *RequestOption) Clear(option RequestOption) { + *this = (*this & (^option)) } type RequestHeader struct { @@ -38,9 +47,28 @@ func (this *RequestHeader) Destination() v2net.Destination { return v2net.TCPDestination(this.Address, this.Port) } +type ResponseOption byte + +var ( + ResponseOptionConnectionReuse = ResponseOption(1) +) + +func (this *ResponseOption) Set(option ResponseOption) { + *this = (*this | option) +} + +func (this ResponseOption) Has(option ResponseOption) bool { + return (this | option) == option +} + +func (this *ResponseOption) Clear(option ResponseOption) { + *this = (*this & (^option)) +} + type ResponseCommand interface{} type ResponseHeader struct { + Option ResponseOption Command ResponseCommand } diff --git a/common/protocol/headers_test.go b/common/protocol/headers_test.go new file mode 100644 index 000000000..da7cf4342 --- /dev/null +++ b/common/protocol/headers_test.go @@ -0,0 +1,34 @@ +package protocol_test + +import ( + "testing" + + . "github.com/v2ray/v2ray-core/common/protocol" + "github.com/v2ray/v2ray-core/testing/assert" +) + +func TestRequestOptionSet(t *testing.T) { + assert := assert.On(t) + + option := new(RequestOption) + assert.Bool(option.Has(RequestOptionChunkStream)).IsFalse() + + option.Set(RequestOptionChunkStream) + assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue() + + option.Set(RequestOptionConnectionReuse) + assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue() + assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue() +} + +func TestRequestOptionClear(t *testing.T) { + assert := assert.On(t) + + option := new(RequestOption) + option.Set(RequestOptionChunkStream) + option.Set(RequestOptionConnectionReuse) + + option.Clear(RequestOptionChunkStream) + assert.Bool(option.Has(RequestOptionChunkStream)).IsFalse() + assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue() +} diff --git a/common/protocol/raw/client.go b/common/protocol/raw/client.go index d5e89179d..c1bcfc2b9 100644 --- a/common/protocol/raw/client.go +++ b/common/protocol/raw/client.go @@ -118,7 +118,9 @@ func (this *ClientSession) DecodeResponseHeader(reader io.Reader) (*protocol.Res return nil, transport.ErrorCorruptedPacket } - header := new(protocol.ResponseHeader) + header := &protocol.ResponseHeader{ + Option: protocol.ResponseOption(buffer.Value[1]), + } if buffer.Value[2] != 0 { cmdId := buffer.Value[2] diff --git a/common/protocol/raw/server.go b/common/protocol/raw/server.go index 7449030e8..5996b05a9 100644 --- a/common/protocol/raw/server.go +++ b/common/protocol/raw/server.go @@ -159,7 +159,7 @@ func (this *ServerSession) EncodeResponseHeader(header *protocol.ResponseHeader, encryptionWriter := crypto.NewCryptionWriter(aesStream, writer) this.responseWriter = encryptionWriter - encryptionWriter.Write([]byte{this.responseHeader, 0x00}) + encryptionWriter.Write([]byte{this.responseHeader, byte(header.Option)}) err := MarshalCommand(header.Command, encryptionWriter) if err != nil { encryptionWriter.Write([]byte{0x00, 0x00}) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 16e97c2d9..89c4ee5fe 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -17,6 +17,7 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" + "github.com/v2ray/v2ray-core/transport" "github.com/v2ray/v2ray-core/transport/hub" ) @@ -145,7 +146,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "") log.Debug("VMessIn: Received request for ", request.Destination()) - if request.Option.IsChunkStream() { + if request.Option.Has(protocol.RequestOptionConnectionReuse) { connection.SetReusable(true) } @@ -161,10 +162,11 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { userSettings := protocol.GetUserSettings(request.User.Level) connReader.SetTimeOut(userSettings.PayloadReadTimeout) reader.SetCached(false) + go func() { bodyReader := session.DecodeRequestBody(reader) var requestReader v2io.Reader - if request.Option.IsChunkStream() { + if request.Option.Has(protocol.RequestOptionChunkStream) { requestReader = vmessio.NewAuthChunkReader(bodyReader) } else { requestReader = v2io.NewAdaptiveReader(bodyReader) @@ -186,6 +188,10 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { Command: this.generateCommand(request), } + if request.Option.Has(protocol.RequestOptionConnectionReuse) && transport.IsConnectionReusable() { + response.Option.Set(protocol.ResponseOptionConnectionReuse) + } + session.EncodeResponseHeader(response, writer) bodyWriter := session.EncodeResponseBody(writer) @@ -193,7 +199,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { // Optimize for small response packet if data, err := output.Read(); err == nil { var v2writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) - if request.Option.IsChunkStream() { + if request.Option.Has(protocol.RequestOptionChunkStream) { v2writer = vmessio.NewAuthChunkWriter(v2writer) } @@ -207,7 +213,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { } output.Release() - if request.Option.IsChunkStream() { + if request.Option.Has(protocol.RequestOptionChunkStream) { v2writer.Write(alloc.NewSmallBuffer().Clear()) } v2writer.Release() diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 9a07b741d..52ec05016 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -14,6 +14,7 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" + "github.com/v2ray/v2ray-core/transport" "github.com/v2ray/v2ray-core/transport/hub" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -49,7 +50,9 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination) defer conn.Close() - if request.Option.IsChunkStream() { + + if transport.IsConnectionReusable() { + request.Option.Set(protocol.RequestOptionConnectionReuse) conn.SetReusable(true) } @@ -79,7 +82,7 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn bodyWriter := session.EncodeRequestBody(writer) var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) - if request.Option.IsChunkStream() { + if request.Option.Has(protocol.RequestOptionChunkStream) { streamWriter = vmessio.NewAuthChunkWriter(streamWriter) } streamWriter.Write(payload) @@ -90,8 +93,11 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn conn.SetReusable(false) } - if request.Option.IsChunkStream() { - streamWriter.Write(alloc.NewSmallBuffer().Clear()) + if request.Option.Has(protocol.RequestOptionChunkStream) { + err := streamWriter.Write(alloc.NewSmallBuffer().Clear()) + if err != nil { + conn.SetReusable(false) + } } streamWriter.Release() return @@ -110,11 +116,15 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con } go this.handleCommand(dest, header.Command) + if !header.Option.Has(protocol.ResponseOptionConnectionReuse) { + conn.SetReusable(false) + } + reader.SetCached(false) decryptReader := session.DecodeResponseBody(reader) var bodyReader v2io.Reader - if request.Option.IsChunkStream() { + if request.Option.Has(protocol.RequestOptionChunkStream) { bodyReader = vmessio.NewAuthChunkReader(decryptReader) } else { bodyReader = v2io.NewAdaptiveReader(decryptReader)