diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 16cb6ae9e..c3606f9c2 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -207,11 +207,46 @@ func pipe(reader *Reader, writer buf.Writer) error { } } +func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *Reader) error { + if meta.Option.Has(OptionData) { + return drain(reader) + } + return nil +} + +func (m *Client) handleStatusNew(meta *FrameMetadata, reader *Reader) error { + if meta.Option.Has(OptionData) { + return drain(reader) + } + return nil +} + +func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *Reader) error { + if !meta.Option.Has(OptionData) { + return nil + } + + if s, found := m.sessionManager.Get(meta.SessionID); found { + return pipe(reader, s.output) + } + return drain(reader) +} + +func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *Reader) error { + if s, found := m.sessionManager.Get(meta.SessionID); found { + s.CloseDownlink() + s.output.Close() + } + if meta.Option.Has(OptionData) { + return drain(reader) + } + return nil +} + func (m *Client) fetchOutput() { defer m.cancel() reader := NewReader(m.inboundRay.InboundOutput()) -L: for { meta, err := reader.ReadMetadata() if err != nil { @@ -219,35 +254,23 @@ L: break } - var drainData bool switch meta.SessionStatus { case SessionStatusKeepAlive: - drainData = true + err = m.handleStatueKeepAlive(meta, reader) case SessionStatusEnd: - if s, found := m.sessionManager.Get(meta.SessionID); found { - s.CloseDownlink() - s.output.Close() - } - drainData = true + err = m.handleStatusEnd(meta, reader) case SessionStatusNew: - drainData = true + err = m.handleStatusNew(meta, reader) case SessionStatusKeep: - if !meta.Option.Has(OptionData) { - break - } - if s, found := m.sessionManager.Get(meta.SessionID); found { - if err := pipe(reader, s.output); err != nil { - log.Trace(newError("failed to pipe data").Base(err)) - break L - } - } + err = m.handleStatusKeep(meta, reader) + default: + log.Trace(newError("unknown status: ", meta.SessionStatus).AtWarning()) + return } - if drainData && meta.Option.Has(OptionData) { - if err := drain(reader); err != nil { - log.Trace(newError("failed to drain data").Base(err)) - break - } + if err != nil { + log.Trace(newError("failed to process data").Base(err)) + return } } } @@ -300,12 +323,63 @@ func handle(ctx context.Context, s *Session, output buf.Writer) { s.CloseDownlink() } +func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *Reader) error { + if meta.Option.Has(OptionData) { + return drain(reader) + } + return nil +} + +func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *Reader) error { + log.Trace(newError("received request for ", meta.Target)) + inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target) + if err != nil { + if meta.Option.Has(OptionData) { + drain(reader) + } + return newError("failed to dispatch request.").Base(err) + } + s := &Session{ + input: inboundRay.InboundOutput(), + output: inboundRay.InboundInput(), + parent: w.sessionManager, + ID: meta.SessionID, + } + w.sessionManager.Add(s) + go handle(ctx, s, w.outboundRay.OutboundOutput()) + if meta.Option.Has(OptionData) { + return pipe(reader, s.output) + } + return nil +} + +func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *Reader) error { + if !meta.Option.Has(OptionData) { + return nil + } + if s, found := w.sessionManager.Get(meta.SessionID); found { + return pipe(reader, s.output) + } + return drain(reader) +} + +func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *Reader) error { + if s, found := w.sessionManager.Get(meta.SessionID); found { + s.CloseUplink() + s.output.Close() + } + if meta.Option.Has(OptionData) { + return drain(reader) + } + return nil +} + func (w *ServerWorker) run(ctx context.Context) { input := w.outboundRay.OutboundInput() reader := NewReader(input) defer w.sessionManager.Close() -L: + for { select { case <-ctx.Done(): @@ -319,56 +393,23 @@ L: return } - var drainData bool switch meta.SessionStatus { case SessionStatusKeepAlive: - drainData = true + err = w.handleStatusKeepAlive(meta, reader) case SessionStatusEnd: - if s, found := w.sessionManager.Get(meta.SessionID); found { - s.CloseUplink() - s.output.Close() - } - drainData = true + err = w.handleStatusEnd(meta, reader) case SessionStatusNew: - log.Trace(newError("received request for ", meta.Target)) - inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target) - if err != nil { - log.Trace(newError("failed to dispatch request.").Base(err)) - drainData = true - break - } - s := &Session{ - input: inboundRay.InboundOutput(), - output: inboundRay.InboundInput(), - parent: w.sessionManager, - ID: meta.SessionID, - } - w.sessionManager.Add(s) - go handle(ctx, s, w.outboundRay.OutboundOutput()) - if !meta.Option.Has(OptionData) { - break - } - if err := pipe(reader, s.output); err != nil { - log.Trace(newError("failed to read data").Base(err)) - break L - } + err = w.handleStatusNew(ctx, meta, reader) case SessionStatusKeep: - if !meta.Option.Has(OptionData) { - break - } - if s, found := w.sessionManager.Get(meta.SessionID); found { - if err := pipe(reader, s.output); err != nil { - log.Trace(newError("failed to read data").Base(err)) - break L - } - } + err = w.handleStatusKeep(meta, reader) + default: + log.Trace(newError("unknown status: ", meta.SessionStatus).AtWarning()) + return } - if meta.Option.Has(OptionData) && drainData { - if err := drain(reader); err != nil { - log.Trace(newError("failed to drain data").Base(err)) - break - } + if err != nil { + log.Trace(newError("failed to process data").Base(err)) + return } } }