1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-11-17 18:06:15 -05:00

close mux session on end

This commit is contained in:
Darien Raymond 2017-05-02 23:53:39 +02:00
parent d5f931ae8b
commit dc76e36ed7
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
2 changed files with 13 additions and 34 deletions

View File

@ -140,7 +140,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
s.transferType = transferType s.transferType = transferType
writer := NewWriter(s.ID, dest, output, transferType) writer := NewWriter(s.ID, dest, output, transferType)
defer writer.Close() defer writer.Close()
defer s.CloseUplink() defer s.Close()
log.Trace(newError("dispatching request to ", dest)) log.Trace(newError("dispatching request to ", dest))
data, _ := s.input.ReadTimeout(time.Millisecond * 500) data, _ := s.input.ReadTimeout(time.Millisecond * 500)
@ -207,8 +207,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader io.Reader) error {
func (m *Client) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error { func (m *Client) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error {
if s, found := m.sessionManager.Get(meta.SessionID); found { if s, found := m.sessionManager.Get(meta.SessionID); found {
s.CloseDownlink() s.Close()
s.output.Close()
} }
if meta.Option.Has(OptionData) { if meta.Option.Has(OptionData) {
return drain(reader) return drain(reader)
@ -298,7 +297,7 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
log.Trace(newError("session ", s.ID, " ends: ").Base(err)) log.Trace(newError("session ", s.ID, " ends: ").Base(err))
} }
writer.Close() writer.Close()
s.CloseDownlink() s.Close()
} }
func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader io.Reader) error { func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader io.Reader) error {
@ -347,8 +346,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader io.Reader) e
func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error { func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error {
if s, found := w.sessionManager.Get(meta.SessionID); found { if s, found := w.sessionManager.Get(meta.SessionID); found {
s.CloseUplink() s.Close()
s.output.Close()
} }
if meta.Option.Has(OptionData) { if meta.Option.Has(OptionData) {
return drain(reader) return drain(reader)

View File

@ -115,37 +115,18 @@ func (m *SessionManager) Close() {
} }
type Session struct { type Session struct {
sync.Mutex
input ray.InputStream input ray.InputStream
output ray.OutputStream output ray.OutputStream
parent *SessionManager parent *SessionManager
ID uint16 ID uint16
uplinkClosed bool
downlinkClosed bool
transferType protocol.TransferType transferType protocol.TransferType
} }
func (s *Session) CloseUplink() { func (s *Session) Close() {
var allDone bool s.output.Close()
s.Lock() s.input.Close()
s.uplinkClosed = true
allDone = s.uplinkClosed && s.downlinkClosed
s.Unlock()
if allDone {
s.parent.Remove(s.ID) s.parent.Remove(s.ID)
} }
}
func (s *Session) CloseDownlink() {
var allDone bool
s.Lock()
s.downlinkClosed = true
allDone = s.uplinkClosed && s.downlinkClosed
s.Unlock()
if allDone {
s.parent.Remove(s.ID)
}
}
func (s *Session) NewReader(reader io.Reader) buf.Reader { func (s *Session) NewReader(reader io.Reader) buf.Reader {
if s.transferType == protocol.TransferTypeStream { if s.transferType == protocol.TransferTypeStream {