From 90c6113dfc5c7b5f91f07002f1a5aaa0801559b6 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Wed, 4 Apr 2018 17:20:45 +0200 Subject: [PATCH] handle transport errors in mux session --- app/proxyman/mux/frame.go | 1 + app/proxyman/mux/mux.go | 36 ++++++++++++++++++++++++++++++++++-- app/proxyman/mux/writer.go | 13 +++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/app/proxyman/mux/frame.go b/app/proxyman/mux/frame.go index 3c56d6044..ac32d3cad 100644 --- a/app/proxyman/mux/frame.go +++ b/app/proxyman/mux/frame.go @@ -15,6 +15,7 @@ const ( SessionStatusKeep SessionStatus = 0x02 SessionStatusEnd SessionStatus = 0x03 SessionStatusKeepAlive SessionStatus = 0x04 + SessionStatusError SessionStatus = 0x05 ) const ( diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index a94c416de..93fb1b1c2 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -146,12 +146,14 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { } s.transferType = transferType writer := NewWriter(s.ID, dest, output, transferType) - defer writer.Close() defer s.Close() newError("dispatching request to ", dest).WithContext(ctx).WriteToLog() if err := buf.Copy(s.input, writer); err != nil { newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog() + writer.Error() + } else { + writer.Close() } } @@ -214,6 +216,18 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader return nil } +func (m *Client) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error { + if s, found := m.sessionManager.Get(meta.SessionID); found { + s.output.CloseError() + s.input.CloseError() + s.Close() + } + if meta.Option.Has(OptionData) { + return drain(reader) + } + return nil +} + func (m *Client) fetchOutput() { defer m.done.Close() @@ -237,6 +251,8 @@ func (m *Client) fetchOutput() { err = m.handleStatusNew(meta, reader) case SessionStatusKeep: err = m.handleStatusKeep(meta, reader) + case SessionStatusError: + err = m.handleStatusError(meta, reader) default: newError("unknown status: ", meta.SessionStatus).AtError().WriteToLog() return @@ -294,8 +310,10 @@ func handle(ctx context.Context, s *Session, output buf.Writer) { writer := NewResponseWriter(s.ID, output, s.transferType) if err := buf.Copy(s.input, writer); err != nil { newError("session ", s.ID, " ends.").Base(err).WithContext(ctx).WriteToLog() + writer.Error() + } else { + writer.Close() } - writer.Close() s.Close() } @@ -353,6 +371,18 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered return nil } +func (w *ServerWorker) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error { + if s, found := w.sessionManager.Get(meta.SessionID); found { + s.input.CloseError() + s.output.CloseError() + s.Close() + } + if meta.Option.Has(OptionData) { + return drain(reader) + } + return nil +} + func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error { meta, err := ReadMetadata(reader) if err != nil { @@ -368,6 +398,8 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead err = w.handleStatusNew(ctx, meta, reader) case SessionStatusKeep: err = w.handleStatusKeep(meta, reader) + case SessionStatusError: + err = w.handleStatusError(meta, reader) default: return newError("unknown status: ", meta.SessionStatus).AtError() } diff --git a/app/proxyman/mux/writer.go b/app/proxyman/mux/writer.go index b043909f3..be2e2d90c 100644 --- a/app/proxyman/mux/writer.go +++ b/app/proxyman/mux/writer.go @@ -112,3 +112,16 @@ func (w *Writer) Close() error { w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(frame)) return nil } + +func (w *Writer) Error() error { + meta := FrameMetadata{ + SessionID: w.id, + SessionStatus: SessionStatusError, + } + + frame := buf.New() + common.Must(meta.WriteTo(frame)) + + w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(frame)) + return nil +}