diff --git a/app/proxyman/mux/frame.go b/app/proxyman/mux/frame.go index 3d2ecc913..b86853797 100644 --- a/app/proxyman/mux/frame.go +++ b/app/proxyman/mux/frame.go @@ -16,6 +16,24 @@ const ( SessionStatusEnd SessionStatus = 0x03 ) +type Option byte + +const ( + OptionData Option = 0x01 +) + +func (o Option) Has(x Option) bool { + return (o & x) == x +} + +func (o *Option) Add(x Option) { + *o = (*o | x) +} + +func (o *Option) Clear(x Option) { + *o = (*o & (^x)) +} + type TargetNetwork byte const ( @@ -36,7 +54,7 @@ Frame format 2 bytes - length 2 bytes - session id 1 bytes - status -1 bytes - reserved +1 bytes - option 1 byte - network 2 bytes - port @@ -48,6 +66,7 @@ type FrameMetadata struct { SessionID uint16 SessionStatus SessionStatus Target net.Destination + Option Option } func (f FrameMetadata) AsSupplier() buf.Supplier { @@ -55,7 +74,7 @@ func (f FrameMetadata) AsSupplier() buf.Supplier { b = serial.Uint16ToBytes(uint16(0), b) // place holder for length b = serial.Uint16ToBytes(f.SessionID, b) - b = append(b, byte(f.SessionStatus), 0 /* reserved */) + b = append(b, byte(f.SessionStatus), byte(f.Option)) length := 4 if f.SessionStatus == SessionStatusNew { diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 51812f83d..7302a515d 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -1,19 +1,137 @@ package mux -import "v2ray.com/core/common/net" +import ( + "context" + "sync" + "time" + + "v2ray.com/core/common/buf" + "v2ray.com/core/common/signal" + "v2ray.com/core/proxy" + "v2ray.com/core/transport/ray" +) const ( maxParallel = 8 maxTotal = 128 ) -type mergerWorker struct { +type clientSession struct { + sync.Mutex + outboundRay ray.OutboundRay + parent *Client + id uint16 + uplinkClosed bool + downlinkClosed bool } -func (w *mergerWorker) isFull() bool { - return true +func (s *clientSession) checkAndRemove() { + s.Lock() + if s.uplinkClosed && s.downlinkClosed { + s.parent.remove(s.id) + } + s.Unlock() } -type Merger struct { - sessions map[net.Destination]mergerWorker +func (s *clientSession) closeUplink() { + s.Lock() + s.uplinkClosed = true + s.Unlock() + s.checkAndRemove() +} + +func (s *clientSession) closeDownlink() { + s.Lock() + s.downlinkClosed = true + s.Unlock() + s.checkAndRemove() +} + +type Client struct { + access sync.RWMutex + count uint16 + sessions map[uint16]*clientSession + inboundRay ray.InboundRay +} + +func (m *Client) IsFullyOccupied() bool { + m.access.RLock() + defer m.access.RUnlock() + + return len(m.sessions) >= maxParallel +} + +func (m *Client) IsFullyUsed() bool { + m.access.RLock() + defer m.access.RUnlock() + + return m.count >= maxTotal +} + +func (m *Client) remove(id uint16) { + m.access.Lock() + delete(m.sessions, id) + m.access.Unlock() +} + +func (m *Client) fetchInput(ctx context.Context, session *clientSession) { + dest, _ := proxy.TargetFromContext(ctx) + writer := &muxWriter{ + dest: dest, + id: session.id, + writer: m.inboundRay.InboundInput(), + } + _, timer := signal.CancelAfterInactivity(ctx, time.Minute*5) + buf.PipeUntilEOF(timer, session.outboundRay.OutboundInput(), writer) + writer.Close() + session.closeUplink() +} + +func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) { + m.access.Lock() + defer m.access.Unlock() + + m.count++ + id := m.count + session := &clientSession{ + outboundRay: outboundRay, + parent: m, + id: id, + } + m.sessions[id] = session + go m.fetchInput(ctx, session) +} + +func (m *Client) fetchOutput() { + reader := NewReader(m.inboundRay.InboundOutput()) + for { + meta, err := reader.ReadMetadata() + if err != nil { + break + } + m.access.RLock() + session, found := m.sessions[meta.SessionID] + m.access.RUnlock() + if found && meta.SessionStatus == SessionStatusEnd { + session.closeDownlink() + } + if !meta.Option.Has(OptionData) { + continue + } + + for { + data, more, err := reader.Read() + if err != nil { + break + } + if found { + if err := session.outboundRay.OutboundOutput().Write(data); err != nil { + break + } + } + if !more { + break + } + } + } } diff --git a/app/proxyman/mux/writer.go b/app/proxyman/mux/writer.go index 1ae3cb668..04ed8037a 100644 --- a/app/proxyman/mux/writer.go +++ b/app/proxyman/mux/writer.go @@ -2,41 +2,76 @@ package mux import ( "v2ray.com/core/common/buf" + "v2ray.com/core/common/net" "v2ray.com/core/common/serial" ) type muxWriter struct { - meta *FrameMetadata - writer buf.Writer + id uint16 + dest net.Destination + writer buf.Writer + followup bool +} + +func (w *muxWriter) writeInternal(b *buf.Buffer) error { + meta := FrameMetadata{ + SessionID: w.id, + Target: w.dest, + } + if w.followup { + meta.SessionStatus = SessionStatusKeep + } else { + w.followup = true + meta.SessionStatus = SessionStatusNew + } + + if b.Len() > 0 { + meta.Option.Add(OptionData) + } + + frame := buf.New() + frame.AppendSupplier(meta.AsSupplier()) + + if b.Len() > 0 { + frame.AppendSupplier(serial.WriteUint16(0)) + lengthBytes := frame.BytesFrom(-2) + + nBytes, err := frame.Write(b.Bytes()) + if err != nil { + frame.Release() + return err + } + + serial.Uint16ToBytes(uint16(nBytes), lengthBytes[:0]) + b.SliceFrom(nBytes) + } + + return w.writer.Write(frame) } func (w *muxWriter) Write(b *buf.Buffer) error { - frame := buf.New() - frame.AppendSupplier(w.meta.AsSupplier()) - if w.meta.SessionStatus == SessionStatusNew { - w.meta.SessionStatus = SessionStatusKeep - } + defer b.Release() - frame.AppendSupplier(serial.WriteUint16(0)) - lengthBytes := frame.BytesFrom(-2) - - nBytes, err := frame.Write(b.Bytes()) - if err != nil { + if err := w.writeInternal(b); err != nil { return err } - - serial.Uint16ToBytes(uint16(nBytes), lengthBytes[:0]) - if err := w.writer.Write(frame); err != nil { - frame.Release() - b.Release() - return err + for !b.IsEmpty() { + if err := w.writeInternal(b); err != nil { + return err + } } - - b.SliceFrom(nBytes) - if !b.IsEmpty() { - return w.Write(b) - } - b.Release() - return nil } + +func (w *muxWriter) Close() { + meta := FrameMetadata{ + SessionID: w.id, + Target: w.dest, + SessionStatus: SessionStatusEnd, + } + + frame := buf.New() + frame.AppendSupplier(meta.AsSupplier()) + + w.writer.Write(frame) +}