diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 8a8194673..c3f72b00c 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -124,6 +124,32 @@ func (c *udpConn) updateActivity() { atomic.StoreInt64(&c.lastActivityTime, time.Now().Unix()) } +// ReadMultiBuffer implements buf.Reader +func (c *udpConn) ReadMultiBuffer() (buf.MultiBuffer, error) { + var payload buf.MultiBuffer + + select { + case in := <-c.input: + payload.Append(in) + case <-c.done.Wait(): + return nil, io.EOF + } + +L: + for { + select { + case in := <-c.input: + payload.Append(in) + case <-c.done.Wait(): + break L + default: + break L + } + } + + return payload, nil +} + func (c *udpConn) Read(buf []byte) (int, error) { select { case in := <-c.input: @@ -202,7 +228,7 @@ func (w *udpWorker) getConnection(id connID) (*udpConn, bool) { w.Lock() defer w.Unlock() - if conn, found := w.activeConn[id]; found { + if conn, found := w.activeConn[id]; found && !conn.done.Done() { return conn, true } diff --git a/app/proxyman/mux/frame.go b/app/proxyman/mux/frame.go index 8b78846d5..ba43d19b5 100644 --- a/app/proxyman/mux/frame.go +++ b/app/proxyman/mux/frame.go @@ -1,6 +1,7 @@ package mux import ( + "v2ray.com/core/common" "v2ray.com/core/common/bitmask" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" @@ -58,21 +59,21 @@ type FrameMetadata struct { func (f FrameMetadata) WriteTo(b *buf.Buffer) error { lenBytes := b.Bytes() - b.AppendBytes(0x00, 0x00) + common.Must2(b.AppendBytes(0x00, 0x00)) len0 := b.Len() if err := b.AppendSupplier(serial.WriteUint16(f.SessionID)); err != nil { return err } - b.AppendBytes(byte(f.SessionStatus), byte(f.Option)) + common.Must2(b.AppendBytes(byte(f.SessionStatus), byte(f.Option))) if f.SessionStatus == SessionStatusNew { switch f.Target.Network { case net.Network_TCP: - b.AppendBytes(byte(TargetNetworkTCP)) + common.Must2(b.AppendBytes(byte(TargetNetworkTCP))) case net.Network_UDP: - b.AppendBytes(byte(TargetNetworkUDP)) + common.Must2(b.AppendBytes(byte(TargetNetworkUDP))) } if err := addrParser.WriteAddressPort(b, f.Target.Address, f.Target.Port); err != nil { @@ -85,6 +86,8 @@ func (f FrameMetadata) WriteTo(b *buf.Buffer) error { return nil } +// ReadFrameFrom reads a FrameMetadata from the given buffer. +// Visible for testing only. func ReadFrameFrom(b *buf.Buffer) (*FrameMetadata, error) { if b.Len() < 4 { return nil, newError("insufficient buffer: ", b.Len())