diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 65c8debec..f2a4265df 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -211,40 +211,43 @@ func (m *Client) fetchOutput() { defer m.cancel() reader := NewReader(m.inboundRay.InboundOutput()) +L: for { meta, err := reader.ReadMetadata() if err != nil { log.Trace(newError("failed to read metadata").Base(err)) break } - if meta.SessionStatus == SessionStatusKeepAlive { - if meta.Option.Has(OptionData) { - if err := drain(reader); err != nil { - log.Trace(newError("failed to read data").Base(err)) - break + + var drainData bool + switch meta.SessionStatus { + case SessionStatusKeepAlive: + drainData = true + case SessionStatusEnd: + if s, found := m.sessionManager.Get(meta.SessionID); found { + s.CloseDownlink() + s.output.Close() + } + drainData = true + case SessionStatusNew: + drainData = true + case SessionStatusKeep: + if !meta.Option.Has(OptionData) { + break + } + if s, found := m.sessionManager.Get(meta.SessionID); found { + if err := pipe(reader, s.output); err != nil { + log.Trace(newError("failed to pipe data").Base(err)) + break L } } - continue } - s, found := m.sessionManager.Get(meta.SessionID) - if found && meta.SessionStatus == SessionStatusEnd { - s.CloseDownlink() - s.output.Close() - } - if !meta.Option.Has(OptionData) { - continue - } - - if found { - err = pipe(reader, s.output) - } else { - err = drain(reader) - } - - if err != nil { - log.Trace(newError("failed to read data").Base(err)) - break + if drainData && meta.Option.Has(OptionData) { + if err := drain(reader); err != nil { + log.Trace(newError("failed to drain data").Base(err)) + break + } } } } @@ -300,6 +303,7 @@ func handle(ctx context.Context, s *Session, output buf.Writer) { func (w *ServerWorker) run(ctx context.Context) { input := w.outboundRay.OutboundInput() reader := NewReader(input) +L: for { select { case <-ctx.Done(): @@ -313,30 +317,25 @@ func (w *ServerWorker) run(ctx context.Context) { return } - if meta.SessionStatus == SessionStatusKeepAlive { - if meta.Option.Has(OptionData) { - if err := drain(reader); err != nil { - log.Trace(newError("failed to read data").Base(err)) - break - } + var drainData bool + switch meta.SessionStatus { + case SessionStatusKeepAlive: + drainData = true + case SessionStatusEnd: + if s, found := w.sessionManager.Get(meta.SessionID); found { + s.CloseUplink() + s.output.Close() } - continue - } - - s, found := w.sessionManager.Get(meta.SessionID) - if found && meta.SessionStatus == SessionStatusEnd { - s.CloseUplink() - s.output.Close() - } - - if meta.SessionStatus == SessionStatusNew { + drainData = true + case SessionStatusNew: log.Trace(newError("received request for ", meta.Target)) inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target) if err != nil { log.Trace(newError("failed to dispatch request.").Base(err)) - continue + drainData = true + break } - s = &Session{ + s := &Session{ input: inboundRay.InboundOutput(), output: inboundRay.InboundInput(), parent: w.sessionManager, @@ -344,21 +343,30 @@ func (w *ServerWorker) run(ctx context.Context) { } w.sessionManager.Add(s) go handle(ctx, s, w.outboundRay.OutboundOutput()) + if !meta.Option.Has(OptionData) { + break + } + if err := pipe(reader, s.output); err != nil { + log.Trace(newError("failed to read data").Base(err)) + break L + } + case SessionStatusKeep: + if !meta.Option.Has(OptionData) { + break + } + if s, found := w.sessionManager.Get(meta.SessionID); found { + if err := pipe(reader, s.output); err != nil { + log.Trace(newError("failed to read data").Base(err)) + break L + } + } } - if !meta.Option.Has(OptionData) { - continue - } - - if s != nil { - err = pipe(reader, s.output) - } else { - err = drain(reader) - } - - if err != nil { - log.Trace(newError("failed to read data").Base(err)) - break + if meta.Option.Has(OptionData) && drainData { + if err := drain(reader); err != nil { + log.Trace(newError("failed to drain data").Base(err)) + break + } } } } diff --git a/app/proxyman/mux/status.go b/app/proxyman/mux/status.go deleted file mode 100644 index 84d2da281..000000000 --- a/app/proxyman/mux/status.go +++ /dev/null @@ -1,3 +0,0 @@ -package mux - -type statusHandler func(meta *FrameMetadata) error