diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 3b390b71c..673ae1104 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -94,13 +94,18 @@ func (m *ClientManager) onClientFinish() { m.access.Lock() defer m.access.Unlock() - nActive := 0 - for idx, client := range m.clients { - if nActive != idx && !client.Closed() { - m.clients[nActive] = client + if len(m.clients) < 10 { + return + } + + activeClients := make([]*Client, 0, len(m.clients)) + + for _, client := range m.clients { + if !client.Closed() { + activeClients = append(activeClients, client) } } - m.clients = m.clients[:nActive] + m.clients = activeClients } type Client struct { @@ -209,12 +214,40 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool return true } +func drain(reader *Reader) error { + for { + data, more, err := reader.Read() + if err != nil { + return err + } + data.Release() + if !more { + return nil + } + } +} + +func pipe(reader *Reader, writer buf.Writer) error { + for { + data, more, err := reader.Read() + if err != nil { + return err + } + if err := writer.Write(data); err != nil { + return err + } + if !more { + return nil + } + } +} + func (m *Client) fetchOutput() { reader := NewReader(m.inboundRay.InboundOutput()) for { meta, err := reader.ReadMetadata() if err != nil { - log.Warning("Proxyman|Mux|Client: Failed to read metadata: ", err) + log.Info("Proxyman|Mux|Client: Failed to read metadata: ", err) break } m.access.RLock() @@ -228,19 +261,15 @@ func (m *Client) fetchOutput() { continue } - for { - data, more, err := reader.Read() - if err != nil { - break - } - if found { - if err := s.output.Write(data); err != nil { - break - } - } - if !more { - break - } + if found { + err = pipe(reader, s.output) + } else { + err = drain(reader) + } + + if err != nil { + log.Info("Proxyman|Mux|Client: Failed to read data: ", err) + break } } } @@ -295,22 +324,9 @@ func handle(ctx context.Context, s *session, output buf.Writer) { writer := NewResponseWriter(s.id, output) defer writer.Close() - for { - select { - case <-ctx.Done(): - log.Debug("Proxyman|Mux|ServerWorker: Session ", s.id, " ends by context.") - return - default: - data, err := s.input.Read() - if err != nil { - log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err) - return - } - if err := writer.Write(data); err != nil { - log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err) - return - } - } + _, timer := signal.CancelAfterInactivity(ctx, time.Minute*30) + if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil { + log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err) } } @@ -357,20 +373,19 @@ func (w *ServerWorker) run(ctx context.Context) { go handle(ctx, s, w.outboundRay.OutboundOutput()) } - if meta.Option.Has(OptionData) { - for { - data, more, err := reader.Read() - if err != nil { - break - } - if s != nil { - if err := s.output.Write(data); err != nil { - } - } - if !more { - break - } - } + if !meta.Option.Has(OptionData) { + continue + } + + if s != nil { + err = pipe(reader, s.output) + } else { + err = drain(reader) + } + + if err != nil { + log.Info("Proxyman|Mux|ServerWorker: Failed to read data: ", err) + break } } }