diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index a0778c1e0..9e4abd3eb 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -140,7 +140,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { s.transferType = transferType writer := NewWriter(s.ID, dest, output, transferType) defer writer.Close() - defer s.CloseUplink() + defer s.Close() log.Trace(newError("dispatching request to ", dest)) data, _ := s.input.ReadTimeout(time.Millisecond * 500) @@ -207,8 +207,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader io.Reader) error { func (m *Client) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error { if s, found := m.sessionManager.Get(meta.SessionID); found { - s.CloseDownlink() - s.output.Close() + s.Close() } if meta.Option.Has(OptionData) { return drain(reader) @@ -298,7 +297,7 @@ func handle(ctx context.Context, s *Session, output buf.Writer) { log.Trace(newError("session ", s.ID, " ends: ").Base(err)) } writer.Close() - s.CloseDownlink() + s.Close() } func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader io.Reader) error { @@ -347,8 +346,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader io.Reader) e func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error { if s, found := w.sessionManager.Get(meta.SessionID); found { - s.CloseUplink() - s.output.Close() + s.Close() } if meta.Option.Has(OptionData) { return drain(reader) diff --git a/app/proxyman/mux/session.go b/app/proxyman/mux/session.go index 80bb1b3ce..60816dd11 100644 --- a/app/proxyman/mux/session.go +++ b/app/proxyman/mux/session.go @@ -115,36 +115,17 @@ func (m *SessionManager) Close() { } type Session struct { - sync.Mutex - input ray.InputStream - output ray.OutputStream - parent *SessionManager - ID uint16 - uplinkClosed bool - downlinkClosed bool - transferType protocol.TransferType + input ray.InputStream + output ray.OutputStream + parent *SessionManager + ID uint16 + transferType protocol.TransferType } -func (s *Session) CloseUplink() { - var allDone bool - s.Lock() - s.uplinkClosed = true - allDone = s.uplinkClosed && s.downlinkClosed - s.Unlock() - if allDone { - s.parent.Remove(s.ID) - } -} - -func (s *Session) CloseDownlink() { - var allDone bool - s.Lock() - s.downlinkClosed = true - allDone = s.uplinkClosed && s.downlinkClosed - s.Unlock() - if allDone { - s.parent.Remove(s.ID) - } +func (s *Session) Close() { + s.output.Close() + s.input.Close() + s.parent.Remove(s.ID) } func (s *Session) NewReader(reader io.Reader) buf.Reader {