1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-20 00:07:06 -05:00

close outbound connections when context is done

This commit is contained in:
Darien Raymond 2017-01-28 21:24:46 +01:00
parent 9139a87865
commit fab20bb0cf
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
12 changed files with 45 additions and 28 deletions

View File

@ -65,7 +65,14 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H
func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) { func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
ctx = proxy.ContextWithDialer(ctx, h) ctx = proxy.ContextWithDialer(ctx, h)
h.proxy.Process(ctx, outboundRay) err := h.proxy.Process(ctx, outboundRay)
// Ensure outbound ray is properly closed.
if err != nil {
outboundRay.OutboundOutput().CloseError()
} else {
outboundRay.OutboundOutput().Close()
}
outboundRay.OutboundInput().CloseError()
} }
func (h *Handler) Dial(ctx context.Context, dest v2net.Destination) (internet.Connection, error) { func (h *Handler) Dial(ctx context.Context, dest v2net.Destination) (internet.Connection, error) {

View File

@ -1,5 +1,9 @@
package signal package signal
import (
"context"
)
func executeAndFulfill(f func() error, done chan<- error) { func executeAndFulfill(f func() error, done chan<- error) {
err := f() err := f()
if err != nil { if err != nil {
@ -14,17 +18,28 @@ func ExecuteAsync(f func() error) <-chan error {
return done return done
} }
func ErrorOrFinish2(c1, c2 <-chan error) error { func ErrorOrFinish1(ctx context.Context, c <-chan error) error {
select { select {
case <-ctx.Done():
return ctx.Err()
case err := <-c:
return err
}
}
func ErrorOrFinish2(ctx context.Context, c1, c2 <-chan error) error {
select {
case <-ctx.Done():
return ctx.Err()
case err, failed := <-c1: case err, failed := <-c1:
if failed { if failed {
return err return err
} }
return <-c2 return ErrorOrFinish1(ctx, c2)
case err, failed := <-c2: case err, failed := <-c2:
if failed { if failed {
return err return err
} }
return <-c1 return ErrorOrFinish1(ctx, c1)
} }
} }

View File

@ -1,6 +1,7 @@
package signal_test package signal_test
import ( import (
"context"
"errors" "errors"
"testing" "testing"
@ -16,7 +17,7 @@ func TestErrorOrFinish2_Error(t *testing.T) {
c := make(chan error, 1) c := make(chan error, 1)
go func() { go func() {
c <- ErrorOrFinish2(c1, c2) c <- ErrorOrFinish2(context.Background(), c1, c2)
}() }()
c1 <- errors.New("test") c1 <- errors.New("test")
@ -32,7 +33,7 @@ func TestErrorOrFinish2_Error2(t *testing.T) {
c := make(chan error, 1) c := make(chan error, 1)
go func() { go func() {
c <- ErrorOrFinish2(c1, c2) c <- ErrorOrFinish2(context.Background(), c1, c2)
}() }()
c2 <- errors.New("test") c2 <- errors.New("test")
@ -48,7 +49,7 @@ func TestErrorOrFinish2_NoneError(t *testing.T) {
c := make(chan error, 1) c := make(chan error, 1)
go func() { go func() {
c <- ErrorOrFinish2(c1, c2) c <- ErrorOrFinish2(context.Background(), c1, c2)
}() }()
close(c1) close(c1)
@ -71,7 +72,7 @@ func TestErrorOrFinish2_NoneError2(t *testing.T) {
c := make(chan error, 1) c := make(chan error, 1)
go func() { go func() {
c <- ErrorOrFinish2(c1, c2) c <- ErrorOrFinish2(context.Background(), c1, c2)
}() }()
close(c2) close(c2)

View File

@ -83,7 +83,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
inboundRay.InboundInput().CloseError() inboundRay.InboundInput().CloseError()
inboundRay.InboundOutput().CloseError() inboundRay.InboundOutput().CloseError()
log.Info("Dokodemo: Connection ends with ", err) log.Info("Dokodemo: Connection ends with ", err)

View File

@ -136,7 +136,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) erro
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("Freedom: Connection ending with ", err) log.Info("Freedom: Connection ending with ", err)
input.CloseError() input.CloseError()
output.CloseError() output.CloseError()

View File

@ -150,7 +150,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("HTTP|Server: Connection ends with: ", err) log.Info("HTTP|Server: Connection ends with: ", err)
ray.InboundInput().CloseError() ray.InboundInput().CloseError()
ray.InboundOutput().CloseError() ray.InboundOutput().CloseError()
@ -246,7 +246,7 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, rea
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("HTTP|Server: Connecton ending with ", err) log.Info("HTTP|Server: Connecton ending with ", err)
input.CloseError() input.CloseError()
output.CloseError() output.CloseError()

View File

@ -61,6 +61,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error
} }
log.Info("Shadowsocks|Client: Tunneling request to ", destination, " via ", server.Destination()) log.Info("Shadowsocks|Client: Tunneling request to ", destination, " via ", server.Destination())
defer conn.Close()
conn.SetReusable(false) conn.SetReusable(false)
request := &protocol.RequestHeader{ request := &protocol.RequestHeader{
@ -119,7 +120,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("Shadowsocks|Client: Connection ends with ", err) log.Info("Shadowsocks|Client: Connection ends with ", err)
outboundRay.OutboundInput().CloseError() outboundRay.OutboundInput().CloseError()
outboundRay.OutboundOutput().CloseError() outboundRay.OutboundOutput().CloseError()
@ -161,10 +162,8 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("Shadowsocks|Client: Connection ends with ", err) log.Info("Shadowsocks|Client: Connection ends with ", err)
outboundRay.OutboundInput().CloseError()
outboundRay.OutboundOutput().CloseError()
return err return err
} }

View File

@ -69,6 +69,8 @@ func (s *Server) Network() net.NetworkList {
} }
func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error { func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error {
conn.SetReusable(false)
switch network { switch network {
case net.Network_TCP: case net.Network_TCP:
return s.handleConnection(ctx, conn) return s.handleConnection(ctx, conn)
@ -132,8 +134,6 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
} }
func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) error { func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) error {
conn.SetReusable(false)
timedReader := net.NewTimeOutReader(16, conn) timedReader := net.NewTimeOutReader(16, conn)
bufferedReader := bufio.NewReader(timedReader) bufferedReader := bufio.NewReader(timedReader)
request, bodyReader, err := ReadTCPSession(s.user, bufferedReader) request, bodyReader, err := ReadTCPSession(s.user, bufferedReader)
@ -195,7 +195,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection)
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("Shadowsocks|Server: Connection ends with ", err) log.Info("Shadowsocks|Server: Connection ends with ", err)
ray.InboundInput().CloseError() ray.InboundInput().CloseError()
ray.InboundOutput().CloseError() ray.InboundOutput().CloseError()

View File

@ -108,10 +108,8 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error {
requestDone := signal.ExecuteAsync(requestFunc) requestDone := signal.ExecuteAsync(requestFunc)
responseDone := signal.ExecuteAsync(responseFunc) responseDone := signal.ExecuteAsync(responseFunc)
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("Socks|Client: Connection ends with ", err) log.Info("Socks|Client: Connection ends with ", err)
ray.OutboundInput().CloseError()
ray.OutboundOutput().CloseError()
return err return err
} }

View File

@ -137,10 +137,9 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
return err return err
} }
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("Socks|Server: Connection ends with ", err) log.Info("Socks|Server: Connection ends with ", err)
input.CloseError() input.CloseError()
output.CloseError() output.CloseError()

View File

@ -222,7 +222,7 @@ func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network,
return transferResponse(session, request, response, output, writer) return transferResponse(session, request, response, output, writer)
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("VMess|Inbound: Connection ending with ", err) log.Info("VMess|Inbound: Connection ending with ", err)
connection.SetReusable(false) connection.SetReusable(false)
input.CloseError() input.CloseError()

View File

@ -152,11 +152,9 @@ func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.Outb
return nil return nil
}) })
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
log.Info("VMess|Outbound: Connection ending with ", err) log.Info("VMess|Outbound: Connection ending with ", err)
conn.SetReusable(false) conn.SetReusable(false)
input.CloseError()
output.CloseError()
return err return err
} }