diff --git a/app/proxyman/mux/frame.go b/app/proxyman/mux/frame.go index 89e02905a..df4dad731 100644 --- a/app/proxyman/mux/frame.go +++ b/app/proxyman/mux/frame.go @@ -106,6 +106,7 @@ func (f FrameMetadata) AsSupplier() buf.Supplier { length += nDomain + 2 } } + serial.Uint16ToBytes(uint16(length), lengthBytes[:0]) return length + 2, nil } diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 248da9700..c28571e0d 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -83,7 +83,7 @@ func (m *ClientManager) Dispatch(ctx context.Context, outboundRay ray.OutboundRa client, err := NewClient(m.proxy, m.dialer, m) if err != nil { - return err + return errors.Base(err).Message("Proxyman|Mux|ClientManager: Failed to create client.") } m.clients = append(m.clients, client) client.Dispatch(ctx, outboundRay) @@ -119,25 +119,17 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client ctx, cancel := context.WithCancel(context.Background()) ctx = proxy.ContextWithTarget(ctx, muxCoolDestination) pipe := ray.NewRay(ctx) - err := p.Process(ctx, pipe, dialer) - if err != nil { - cancel() - return nil, err - } - return &Client{ + go p.Process(ctx, pipe, dialer) + c := &Client{ sessions: make(map[uint16]*session, 256), inboundRay: pipe, ctx: ctx, cancel: cancel, manager: m, - }, nil -} - -func (m *Client) isFullyOccupied() bool { - m.access.RLock() - defer m.access.RUnlock() - - return len(m.sessions) >= maxParallel + count: 0, + } + go c.fetchOutput() + return c, nil } func (m *Client) remove(id uint16) { @@ -162,20 +154,28 @@ func (m *Client) Closed() bool { } } -func (m *Client) fetchInput(ctx context.Context, s *session) { +func fetchInput(ctx context.Context, s *session, output buf.Writer) { dest, _ := proxy.TargetFromContext(ctx) writer := &Writer{ dest: dest, id: s.id, - writer: m.inboundRay.InboundInput(), + writer: output, + } + defer writer.Close() + defer s.closeUplink() + + log.Info("Proxyman|Mux|Client: Dispatching request to ", dest) + data, _ := s.input.ReadTimeout(time.Millisecond * 500) + if data != nil { + if err := writer.Write(data); err != nil { + log.Info("Proxyman|Mux|Client: Failed to write first payload: ", err) + return + } } _, timer := signal.CancelAfterInactivity(ctx, time.Minute*5) if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil { log.Info("Proxyman|Mux|Client: Failed to fetch all input: ", err) } - - writer.Close() - s.closeUplink() } func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool { @@ -205,7 +205,7 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool id: id, } m.sessions[id] = s - go m.fetchInput(ctx, s) + go fetchInput(ctx, s, m.inboundRay.InboundInput()) return true } @@ -214,6 +214,7 @@ func (m *Client) fetchOutput() { for { meta, err := reader.ReadMetadata() if err != nil { + log.Warning("Proxyman|Mux|Client: Failed to read metadata: ", err) break } m.access.RLock() @@ -291,6 +292,9 @@ func (w *ServerWorker) remove(id uint16) { } func (w *ServerWorker) handle(ctx context.Context, s *session) { + writer := NewResponseWriter(s.id, w.outboundRay.OutboundOutput()) + defer writer.Close() + for { select { case <-ctx.Done(): @@ -300,7 +304,7 @@ func (w *ServerWorker) handle(ctx context.Context, s *session) { if err != nil { return } - w.outboundRay.OutboundOutput().Write(data) + writer.Write(data) } } } @@ -330,6 +334,7 @@ func (w *ServerWorker) run(ctx context.Context) { } if meta.SessionStatus == SessionStatusNew { + log.Info("Proxyman|Mux|Server: Received request for ", meta.Target) inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target) if err != nil { log.Info("Proxyman|Mux: Failed to dispatch request: ", err) @@ -354,7 +359,8 @@ func (w *ServerWorker) run(ctx context.Context) { break } if s != nil { - s.output.Write(data) + if err := s.output.Write(data); err != nil { + } } if !more { break diff --git a/app/proxyman/mux/reader.go b/app/proxyman/mux/reader.go index 04971d459..912850613 100644 --- a/app/proxyman/mux/reader.go +++ b/app/proxyman/mux/reader.go @@ -4,6 +4,7 @@ import ( "io" "v2ray.com/core/common/buf" + "v2ray.com/core/common/errors" "v2ray.com/core/common/serial" ) @@ -28,6 +29,9 @@ func (r *Reader) ReadMetadata() (*FrameMetadata, error) { return nil, err } metaLen := serial.BytesToUint16(b.Bytes()) + if metaLen > 512 { + return nil, errors.New("Proxyman|Mux|Reader: Invalid metalen ", metaLen) + } b.Clear() if err := b.AppendSupplier(buf.ReadFullFrom(r.reader, int(metaLen))); err != nil { return nil, err diff --git a/app/proxyman/mux/writer.go b/app/proxyman/mux/writer.go index dd72313dd..9927b3008 100644 --- a/app/proxyman/mux/writer.go +++ b/app/proxyman/mux/writer.go @@ -21,6 +21,14 @@ func NewWriter(id uint16, dest net.Destination, writer buf.Writer) *Writer { } } +func NewResponseWriter(id uint16, writer buf.Writer) *Writer { + return &Writer{ + id: id, + writer: writer, + followup: true, + } +} + func (w *Writer) writeInternal(b *buf.Buffer) error { meta := FrameMetadata{ SessionID: w.id, @@ -74,7 +82,6 @@ func (w *Writer) Write(b *buf.Buffer) error { func (w *Writer) Close() { meta := FrameMetadata{ SessionID: w.id, - Target: w.dest, SessionStatus: SessionStatusEnd, }