diff --git a/common/task/task.go b/common/task/task.go new file mode 100644 index 000000000..50f79cfac --- /dev/null +++ b/common/task/task.go @@ -0,0 +1,41 @@ +package task + +import ( + "sync" +) + +type Task interface { + Execute() error +} + +type ParallelExecutor struct { + sync.Mutex + tasks sync.WaitGroup + errors []error +} + +func (pe *ParallelExecutor) track(err error) { + if err == nil { + return + } + + pe.Lock() + pe.errors = append(pe.errors, err) + pe.Unlock() +} + +func (pe *ParallelExecutor) Execute(task Task) { + pe.tasks.Add(1) + go func() { + pe.track(task.Execute()) + pe.tasks.Done() + }() +} + +func (pe *ParallelExecutor) Wait() { + pe.tasks.Wait() +} + +func (pe *ParallelExecutor) Errors() []error { + return pe.errors +} diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 9809b882b..d313feaef 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -15,13 +15,75 @@ import ( v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/serial" + "v2ray.com/core/common/task" "v2ray.com/core/common/uuid" "v2ray.com/core/proxy" "v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/transport/internet" + "v2ray.com/core/transport/ray" ) +type requestProcessor struct { + session *encoding.ServerSession + request *protocol.RequestHeader + input io.Reader + output ray.OutputStream +} + +func (r *requestProcessor) Execute() error { + defer r.output.Close() + + bodyReader := r.session.DecodeRequestBody(r.request, r.input) + defer bodyReader.Release() + + if err := buf.PipeUntilEOF(bodyReader, r.output); err != nil { + log.Debug("VMess|Inbound: Error when sending data to outbound: ", err) + return err + } + + return nil +} + +type responseProcessor struct { + session *encoding.ServerSession + request *protocol.RequestHeader + response *protocol.ResponseHeader + input ray.InputStream + output io.Writer +} + +func (r *responseProcessor) Execute() error { + defer r.input.Release() + r.session.EncodeResponseHeader(r.response, r.output) + + bodyWriter := r.session.EncodeResponseBody(r.request, r.output) + + // Optimize for small response packet + if data, err := r.input.Read(); err == nil { + if err := bodyWriter.Write(data); err != nil { + return err + } + + if bufferedWriter, ok := r.output.(*bufio.BufferedWriter); ok { + bufferedWriter.SetBuffered(false) + } + + if err := buf.PipeUntilEOF(r.input, bodyWriter); err != nil { + log.Debug("VMess|Inbound: Error when sending data to downstream: ", err) + return err + } + } + + if r.request.Option.Has(protocol.RequestOptionChunkStream) { + if err := bodyWriter.Write(buf.NewLocal(8)); err != nil { + return err + } + } + + return nil +} + type userByEmail struct { sync.RWMutex cache map[string]*protocol.User @@ -176,24 +238,17 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { defer input.Close() defer output.Release() - var readFinish sync.Mutex - readFinish.Lock() - userSettings := request.User.GetSettings() connReader.SetTimeOut(userSettings.PayloadReadTimeout) reader.SetBuffered(false) - go func() { - bodyReader := session.DecodeRequestBody(request, reader) - if err := buf.PipeUntilEOF(bodyReader, input); err != nil { - log.Debug("VMess|Inbound: Error when sending data to outbound: ", err) - connection.SetReusable(false) - } - bodyReader.Release() - - input.Close() - readFinish.Unlock() - }() + var executor task.ParallelExecutor + executor.Execute(&requestProcessor{ + session: session, + request: request, + input: reader, + output: input, + }) writer := bufio.NewWriter(connection) defer writer.Release() @@ -206,34 +261,24 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { response.Option.Set(protocol.ResponseOptionConnectionReuse) } - session.EncodeResponseHeader(response, writer) + executor.Execute(&responseProcessor{ + session: session, + request: request, + response: response, + input: output, + output: writer, + }) - bodyWriter := session.EncodeResponseBody(request, writer) - - // Optimize for small response packet - if data, err := output.Read(); err == nil { - if err := bodyWriter.Write(data); err != nil { - connection.SetReusable(false) - } - - writer.SetBuffered(false) - - if err := buf.PipeUntilEOF(output, bodyWriter); err != nil { - log.Debug("VMess|Inbound: Error when sending data to downstream: ", err) - connection.SetReusable(false) - } + executor.Wait() + if err := writer.Flush(); err != nil { + connection.SetReusable(false) } - output.Release() - if request.Option.Has(protocol.RequestOptionChunkStream) { - if err := bodyWriter.Write(buf.NewLocal(8)); err != nil { - connection.SetReusable(false) - } - } - writer.Flush() - bodyWriter.Release() - readFinish.Lock() + errors := executor.Errors() + if len(errors) > 0 { + connection.SetReusable(false) + } } type Factory struct{}