1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-06 17:36:40 -05:00
v2fly/app/reverse/portal.go

270 lines
5.5 KiB
Go
Raw Normal View History

2018-10-27 18:03:11 -04:00
package reverse
import (
"context"
"sync"
"time"
"google.golang.org/protobuf/proto"
2021-02-16 15:31:50 -05:00
"github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/mux"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/session"
"github.com/v2fly/v2ray-core/v5/common/task"
"github.com/v2fly/v2ray-core/v5/features/outbound"
"github.com/v2fly/v2ray-core/v5/transport"
"github.com/v2fly/v2ray-core/v5/transport/pipe"
2018-10-27 18:03:11 -04:00
)
type Portal struct {
2021-12-31 19:11:33 -05:00
ctx context.Context
2018-10-27 18:03:11 -04:00
ohm outbound.Manager
tag string
domain string
picker *StaticMuxPicker
client *mux.ClientManager
}
2021-12-31 19:11:33 -05:00
func NewPortal(ctx context.Context, config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
if config.Tag == "" {
2018-10-27 18:03:11 -04:00
return nil, newError("portal tag is empty")
}
if config.Domain == "" {
2018-10-27 18:03:11 -04:00
return nil, newError("portal domain is empty")
}
picker, err := NewStaticMuxPicker()
if err != nil {
return nil, err
}
return &Portal{
2021-12-31 19:11:33 -05:00
ctx: ctx,
2018-10-27 18:03:11 -04:00
ohm: ohm,
tag: config.Tag,
domain: config.Domain,
picker: picker,
client: &mux.ClientManager{
Picker: picker,
},
}, nil
}
func (p *Portal) Start() error {
2021-12-31 19:11:33 -05:00
return p.ohm.AddHandler(p.ctx, &Outbound{
2018-10-27 18:03:11 -04:00
portal: p,
2018-10-28 02:27:07 -04:00
tag: p.tag,
2018-10-27 18:03:11 -04:00
})
}
func (p *Portal) Close() error {
2021-12-31 19:11:33 -05:00
return p.ohm.RemoveHandler(p.ctx, p.tag)
2018-10-27 18:03:11 -04:00
}
2018-11-13 17:19:58 -05:00
func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
2018-10-27 18:03:11 -04:00
outboundMeta := session.OutboundFromContext(ctx)
if outboundMeta == nil {
return newError("outbound metadata not found").AtError()
}
2018-11-13 17:19:58 -05:00
if isDomain(outboundMeta.Target, p.domain) {
2018-10-28 04:08:43 -04:00
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
2018-10-27 18:03:11 -04:00
if err != nil {
return newError("failed to create mux client worker").Base(err).AtWarning()
}
2021-12-31 19:11:33 -05:00
worker, err := NewPortalWorker(ctx, muxClient)
2018-10-27 18:03:11 -04:00
if err != nil {
return newError("failed to create portal worker").Base(err)
}
2018-11-13 17:19:58 -05:00
p.picker.AddWorker(worker)
2018-10-27 18:03:11 -04:00
return nil
}
2018-11-13 17:19:58 -05:00
return p.client.Dispatch(ctx, link)
2018-10-27 18:03:11 -04:00
}
type Outbound struct {
portal *Portal
tag string
}
func (o *Outbound) Tag() string {
return o.tag
}
2018-11-03 07:36:29 -04:00
func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
2018-10-27 18:03:11 -04:00
if err := o.portal.HandleConnection(ctx, link); err != nil {
newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
2018-12-31 15:25:10 -05:00
common.Interrupt(link.Writer)
2018-10-27 18:03:11 -04:00
}
}
func (o *Outbound) Start() error {
return nil
}
func (o *Outbound) Close() error {
return nil
}
type StaticMuxPicker struct {
access sync.Mutex
workers []*PortalWorker
cTask *task.Periodic
}
func NewStaticMuxPicker() (*StaticMuxPicker, error) {
p := &StaticMuxPicker{}
p.cTask = &task.Periodic{
Execute: p.cleanup,
Interval: time.Second * 30,
}
p.cTask.Start()
return p, nil
}
func (p *StaticMuxPicker) cleanup() error {
p.access.Lock()
defer p.access.Unlock()
var activeWorkers []*PortalWorker
for _, w := range p.workers {
if !w.Closed() {
activeWorkers = append(activeWorkers, w)
}
}
if len(activeWorkers) != len(p.workers) {
p.workers = activeWorkers
}
return nil
}
func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
p.access.Lock()
defer p.access.Unlock()
2018-10-28 02:27:07 -04:00
if len(p.workers) == 0 {
2018-10-27 18:03:11 -04:00
return nil, newError("empty worker list")
}
2021-11-27 01:32:07 -05:00
minIdx := -1
2018-10-28 02:27:07 -04:00
var minConn uint32 = 9999
for i, w := range p.workers {
2018-10-28 04:08:43 -04:00
if w.draining {
2018-10-28 02:27:07 -04:00
continue
2018-10-27 18:03:11 -04:00
}
2022-11-28 20:27:28 -05:00
if w.client.Closed() {
continue
}
2018-10-28 02:27:07 -04:00
if w.client.ActiveConnections() < minConn {
minConn = w.client.ActiveConnections()
minIdx = i
}
}
2018-10-28 04:08:43 -04:00
if minIdx == -1 {
for i, w := range p.workers {
if w.IsFull() {
continue
}
if w.client.ActiveConnections() < minConn {
minConn = w.client.ActiveConnections()
minIdx = i
}
}
}
2018-10-28 02:27:07 -04:00
if minIdx != -1 {
return p.workers[minIdx].client, nil
2018-10-27 18:03:11 -04:00
}
return nil, newError("no mux client worker available")
}
func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
p.access.Lock()
defer p.access.Unlock()
p.workers = append(p.workers, worker)
}
type PortalWorker struct {
2018-10-28 04:08:43 -04:00
client *mux.ClientWorker
control *task.Periodic
writer buf.Writer
reader buf.Reader
draining bool
2018-10-27 18:03:11 -04:00
}
2021-12-31 19:11:33 -05:00
func NewPortalWorker(ctx context.Context, client *mux.ClientWorker) (*PortalWorker, error) {
2018-10-27 18:03:11 -04:00
opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
2018-10-28 02:27:07 -04:00
ctx = session.ContextWithOutbound(ctx, &session.Outbound{
Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
})
2018-11-03 07:36:29 -04:00
f := client.Dispatch(ctx, &transport.Link{
2018-10-27 18:03:11 -04:00
Reader: uplinkReader,
Writer: downlinkWriter,
})
if !f {
return nil, newError("unable to dispatch control connection")
}
w := &PortalWorker{
client: client,
reader: downlinkReader,
writer: uplinkWriter,
}
w.control = &task.Periodic{
Execute: w.heartbeat,
Interval: time.Second * 2,
}
w.control.Start()
return w, nil
}
func (w *PortalWorker) heartbeat() error {
if w.client.Closed() {
return newError("client worker stopped")
}
2018-10-28 04:08:43 -04:00
if w.draining || w.writer == nil {
2018-10-27 18:03:11 -04:00
return newError("already disposed")
}
msg := &Control{}
msg.FillInRandom()
2018-10-28 04:08:43 -04:00
if w.client.TotalConnections() > 256 {
w.draining = true
2018-10-27 18:03:11 -04:00
msg.State = Control_DRAIN
defer func() {
common.Close(w.writer)
2018-12-31 15:25:10 -05:00
common.Interrupt(w.reader)
2018-10-27 18:03:11 -04:00
w.writer = nil
}()
}
b, err := proto.Marshal(msg)
common.Must(err)
2018-11-18 13:36:36 -05:00
mb := buf.MergeBytes(nil, b)
2018-10-27 18:03:11 -04:00
return w.writer.WriteMultiBuffer(mb)
}
func (w *PortalWorker) IsFull() bool {
return w.client.IsFull()
}
func (w *PortalWorker) Closed() bool {
return w.client.Closed()
}