From 10ce629c02346a358d898b65cdd1eba09fc7bd09 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Wed, 26 Apr 2017 21:43:53 +0200 Subject: [PATCH] close input on error --- app/proxyman/mux/mux.go | 62 +++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 2ebbe289f..4236577a1 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -79,12 +79,12 @@ type Client struct { ctx context.Context cancel context.CancelFunc manager *ClientManager - session2Remove chan uint16 concurrency uint32 } var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527)) +// NewClient creates a new mux.Client. func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) ctx = proxy.ContextWithTarget(ctx, muxCoolDestination) @@ -96,7 +96,6 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client ctx: ctx, cancel: cancel, manager: m, - session2Remove: make(chan uint16, 16), concurrency: m.config.Concurrency, } go c.fetchOutput() @@ -266,6 +265,7 @@ type Server struct { dispatcher dispatcher.Interface } +// NewServer creates a new mux.Server. func NewServer(ctx context.Context) *Server { s := &Server{} space := app.SpaceFromContext(ctx) @@ -361,6 +361,31 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *Reader) erro return nil } +func (w *ServerWorker) handleFrame(ctx context.Context, reader *Reader) error { + meta, err := reader.ReadMetadata() + if err != nil { + return newError("failed to read metadata").Base(err) + } + + switch meta.SessionStatus { + case SessionStatusKeepAlive: + err = w.handleStatusKeepAlive(meta, reader) + case SessionStatusEnd: + err = w.handleStatusEnd(meta, reader) + case SessionStatusNew: + err = w.handleStatusNew(ctx, meta, reader) + case SessionStatusKeep: + err = w.handleStatusKeep(meta, reader) + default: + return newError("unknown status: ", meta.SessionStatus).AtWarning() + } + + if err != nil { + return newError("failed to process data").Base(err) + } + return nil +} + func (w *ServerWorker) run(ctx context.Context) { input := w.outboundRay.OutboundInput() reader := NewReader(input) @@ -372,31 +397,14 @@ func (w *ServerWorker) run(ctx context.Context) { case <-ctx.Done(): return default: - } - - meta, err := reader.ReadMetadata() - if err != nil { - log.Trace(newError("failed to read metadata").Base(err)) - return - } - - switch meta.SessionStatus { - case SessionStatusKeepAlive: - err = w.handleStatusKeepAlive(meta, reader) - case SessionStatusEnd: - err = w.handleStatusEnd(meta, reader) - case SessionStatusNew: - err = w.handleStatusNew(ctx, meta, reader) - case SessionStatusKeep: - err = w.handleStatusKeep(meta, reader) - default: - log.Trace(newError("unknown status: ", meta.SessionStatus).AtWarning()) - return - } - - if err != nil { - log.Trace(newError("failed to process data").Base(err)) - return + err := w.handleFrame(ctx, reader) + if err != nil { + if errors.Cause(err) != io.EOF { + log.Trace(newError("unexpected EOF").Base(err)) + input.CloseError() + } + return + } } } }