1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-22 01:57:12 -05:00

remove context from mux.Client

This commit is contained in:
Darien Raymond 2018-02-08 17:46:50 +01:00
parent 32f1ba7c7e
commit b5facc0ca5
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
2 changed files with 23 additions and 28 deletions

View File

@ -14,6 +14,7 @@ import (
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/ray" "v2ray.com/core/transport/ray"
) )
@ -74,8 +75,7 @@ func (m *ClientManager) onClientFinish() {
type Client struct { type Client struct {
sessionManager *SessionManager sessionManager *SessionManager
inboundRay ray.InboundRay inboundRay ray.InboundRay
ctx context.Context done *signal.Done
cancel context.CancelFunc
manager *ClientManager manager *ClientManager
concurrency uint32 concurrency uint32
} }
@ -85,26 +85,26 @@ var muxCoolPort = net.Port(9527)
// NewClient creates a new mux.Client. // NewClient creates a new mux.Client.
func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) { func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx := proxy.ContextWithTarget(context.Background(), net.TCPDestination(muxCoolAddress, muxCoolPort))
ctx = proxy.ContextWithTarget(ctx, net.TCPDestination(muxCoolAddress, muxCoolPort)) ctx, cancel := context.WithCancel(ctx)
pipe := ray.NewRay(ctx) pipe := ray.NewRay(ctx)
go func() {
if err := p.Process(ctx, pipe, dialer); err != nil {
cancel()
errors.New("failed to handler mux client connection").Base(err).WriteToLog()
}
}()
c := &Client{ c := &Client{
sessionManager: NewSessionManager(), sessionManager: NewSessionManager(),
inboundRay: pipe, inboundRay: pipe,
ctx: ctx, done: signal.NewDone(),
cancel: cancel,
manager: m, manager: m,
concurrency: m.config.Concurrency, concurrency: m.config.Concurrency,
} }
go func() {
if err := p.Process(ctx, pipe, dialer); err != nil {
errors.New("failed to handler mux client connection").Base(err).WriteToLog()
}
c.done.Close()
cancel()
}()
go c.fetchOutput() go c.fetchOutput()
go c.monitor() go c.monitor()
return c, nil return c, nil
@ -112,12 +112,7 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client
// Closed returns true if this Client is closed. // Closed returns true if this Client is closed.
func (m *Client) Closed() bool { func (m *Client) Closed() bool {
select { return m.done.Done()
case <-m.ctx.Done():
return true
default:
return false
}
} }
func (m *Client) monitor() { func (m *Client) monitor() {
@ -128,7 +123,7 @@ func (m *Client) monitor() {
for { for {
select { select {
case <-m.ctx.Done(): case <-m.done.C():
m.sessionManager.Close() m.sessionManager.Close()
m.inboundRay.InboundInput().Close() m.inboundRay.InboundInput().Close()
m.inboundRay.InboundOutput().CloseError() m.inboundRay.InboundOutput().CloseError()
@ -136,7 +131,8 @@ func (m *Client) monitor() {
case <-timer.C: case <-timer.C:
size := m.sessionManager.Size() size := m.sessionManager.Size()
if size == 0 && m.sessionManager.CloseIfNoSession() { if size == 0 && m.sessionManager.CloseIfNoSession() {
m.cancel() m.done.Close()
return
} }
} }
} }
@ -170,10 +166,8 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
return false return false
} }
select { if m.done.Done() {
case <-m.ctx.Done():
return false return false
default:
} }
s := sm.Allocate() s := sm.Allocate()
@ -226,7 +220,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader
} }
func (m *Client) fetchOutput() { func (m *Client) fetchOutput() {
defer m.cancel() defer m.done.Close()
reader := buf.NewBufferedReader(m.inboundRay.InboundOutput()) reader := buf.NewBufferedReader(m.inboundRay.InboundOutput())

View File

@ -93,8 +93,6 @@ func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
} }
} }
var zeroAddr net.Addr = &net.TCPAddr{IP: []byte{0, 0, 0, 0}, Port: 0}
// Dial implements proxy.Dialer.Dial(). // Dial implements proxy.Dialer.Dial().
func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) { func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) {
if h.senderSettings != nil { if h.senderSettings != nil {
@ -124,14 +122,17 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
return internet.Dial(ctx, dest) return internet.Dial(ctx, dest)
} }
// GetOutbound implements proxy.GetOutbound.
func (h *Handler) GetOutbound() proxy.Outbound { func (h *Handler) GetOutbound() proxy.Outbound {
return h.proxy return h.proxy
} }
// Start implements common.Runnable.
func (h *Handler) Start() error { func (h *Handler) Start() error {
return nil return nil
} }
// Close implements common.Runnable.
func (h *Handler) Close() error { func (h *Handler) Close() error {
return nil return nil
} }